Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10787. Use ManagedObject in rocksdb-checkpoint-differ instead of RocksObject #6625

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics stati
rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(
compactionLogTableCF.getHandle());
// Set activeRocksDB in differ to access compaction log CF.
rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb().get());
rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb());
// Load all previous compaction logs
rocksDBCheckpointDiffer.loadAllCompactionLogs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.Options;
import org.rocksdb.SstFileReader;

import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;

/**
* Managed SstFileReader.
*/
public class ManagedSstFileReader extends ManagedObject<SstFileReader> {
public class ManagedSstFileReader extends SstFileReader {

private final UncheckedAutoCloseable leakTracker = track(this);

ManagedSstFileReader(SstFileReader original) {
super(original);
public ManagedSstFileReader(final Options options) {
super(options);
}

public static ManagedSstFileReader managed(SstFileReader reader) {
return new ManagedSstFileReader(reader);
@Override
public void close() {
try {
super.close();
} finally {
leakTracker.close();
}
}
}
37 changes: 0 additions & 37 deletions hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,43 +106,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<id>depcheck</id>
<phase></phase>
</execution>
<execution>
<id>banned-rocksdb-imports</id>
<phase>process-sources</phase>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<RestrictImports>
<includeTestCode>false</includeTestCode>
<reason>Use managed RocksObjects under org.apache.hadoop.hdds.utils.db.managed instead.</reason>
<!-- By default, ban all the classes in org.rocksdb -->
<bannedImport>org.rocksdb.**</bannedImport>
<allowedImports>
<allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
<allowedImport>org.rocksdb.Checkpoint</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyDescriptor</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyOptions</allowedImport>
<allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
<allowedImport>org.rocksdb.CompressionType</allowedImport>
<allowedImport>org.rocksdb.DBOptions</allowedImport>
<allowedImport>org.rocksdb.FlushOptions</allowedImport>
<allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
<allowedImport>org.rocksdb.Options</allowedImport>
<allowedImport>org.rocksdb.RocksDB</allowedImport>
<allowedImport>org.rocksdb.RocksDBException</allowedImport>
<allowedImport>org.rocksdb.SstFileReader</allowedImport>
<allowedImport>org.rocksdb.TableProperties</allowedImport>
<allowedImport>org.rocksdb.ReadOptions</allowedImport>
<allowedImport>org.rocksdb.SstFileReaderIterator</allowedImport>
</allowedImports>
<exclusion>org.apache.hadoop.hdds.utils.db.managed.*</exclusion>
</RestrictImports>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -78,7 +77,7 @@ public long getEstimatedTotalKeys() throws RocksDBException {

try (ManagedOptions options = new ManagedOptions()) {
for (String sstFile : sstFiles) {
try (SstFileReader fileReader = new SstFileReader(options)) {
try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
estimatedSize += fileReader.getTableProperties().getNumEntries();
}
Expand All @@ -95,7 +94,7 @@ public Stream<String> getKeyStream(String lowerBound,
// TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough.
final MultipleSstFileIterator<String> itr = new MultipleSstFileIterator<String>(sstFiles) {
private ManagedOptions options;
private ReadOptions readOptions;
private ManagedReadOptions readOptions;

private ManagedSlice lowerBoundSLice;

Expand All @@ -122,9 +121,8 @@ protected void init() {
protected ClosableIterator<String> getKeyIteratorForFile(String file) throws RocksDBException {
return new ManagedSstFileIterator(file, options, readOptions) {
@Override
protected String getIteratorValue(
SstFileReaderIterator iterator) {
return new String(iterator.key(), UTF_8);
protected String getIteratorValue(ManagedSstFileReaderIterator iterator) {
return new String(iterator.get().key(), UTF_8);
}
};
}
Expand Down Expand Up @@ -177,14 +175,15 @@ public void close() throws UncheckedIOException {
}

private abstract static class ManagedSstFileIterator implements ClosableIterator<String> {
private final SstFileReader fileReader;
private final SstFileReaderIterator fileReaderIterator;
private final ManagedSstFileReader fileReader;
private final ManagedSstFileReaderIterator fileReaderIterator;

ManagedSstFileIterator(String path, ManagedOptions options, ReadOptions readOptions) throws RocksDBException {
this.fileReader = new SstFileReader(options);
ManagedSstFileIterator(String path, ManagedOptions options, ManagedReadOptions readOptions)
throws RocksDBException {
this.fileReader = new ManagedSstFileReader(options);
this.fileReader.open(path);
this.fileReaderIterator = fileReader.newIterator(readOptions);
fileReaderIterator.seekToFirst();
this.fileReaderIterator = ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions));
fileReaderIterator.get().seekToFirst();
}

@Override
Expand All @@ -195,15 +194,15 @@ public void close() {

@Override
public boolean hasNext() {
return fileReaderIterator.isValid();
return fileReaderIterator.get().isValid();
}

protected abstract String getIteratorValue(SstFileReaderIterator iterator);
protected abstract String getIteratorValue(ManagedSstFileReaderIterator iterator);

@Override
public String next() {
String value = getIteratorValue(fileReaderIterator);
fileReaderIterator.next();
fileReaderIterator.get().next();
return value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
Expand All @@ -55,12 +56,9 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.DBOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.TableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -177,7 +175,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
private AtomicBoolean suspended;

private ColumnFamilyHandle compactionLogTableCFHandle;
private RocksDB activeRocksDB;
private ManagedRocksDB activeRocksDB;

/**
* For snapshot diff calculation we only need to track following column
Expand Down Expand Up @@ -349,32 +347,11 @@ public static void addDebugLevel(Integer level) {
DEBUG_LEVEL.add(level);
}

/**
* Takes {@link org.rocksdb.Options}.
*/
public void setRocksDBForCompactionTracking(Options rocksOptions,
List<AbstractEventListener> list) {
list.add(newCompactionBeginListener());
list.add(newCompactionCompletedListener());
rocksOptions.setListeners(list);
}

public void setRocksDBForCompactionTracking(Options rocksOptions) {
setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
}

/**
* Takes {@link org.rocksdb.DBOptions}.
*/
public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
List<AbstractEventListener> list) {
list.add(newCompactionBeginListener());
list.add(newCompactionCompletedListener());
rocksOptions.setListeners(list);
}

public void setRocksDBForCompactionTracking(DBOptions rocksOptions) {
setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
public void setRocksDBForCompactionTracking(ManagedDBOptions rocksOptions) {
List<AbstractEventListener> events = new ArrayList<>();
events.add(newCompactionBeginListener());
events.add(newCompactionCompletedListener());
rocksOptions.setListeners(events);
}

/**
Expand Down Expand Up @@ -403,7 +380,7 @@ public synchronized void setCompactionLogTableCFHandle(
* Set activeRocksDB to access CompactionLogTable.
* @param activeRocksDB RocksDB
*/
public synchronized void setActiveRocksDB(RocksDB activeRocksDB) {
public synchronized void setActiveRocksDB(ManagedRocksDB activeRocksDB) {
Preconditions.checkNotNull(activeRocksDB, "RocksDB should not be null.");
this.activeRocksDB = activeRocksDB;
}
Expand Down Expand Up @@ -436,8 +413,7 @@ private boolean isSnapshotInfoTableEmpty(RocksDB db) {
// Note the goal of compaction DAG is to track all compactions that happened
// _after_ a DB checkpoint is taken.

try (ManagedRocksIterator it = ManagedRocksIterator.managed(
db.newIterator(snapshotInfoTableCFHandle))) {
try (ManagedRocksIterator it = ManagedRocksIterator.managed(db.newIterator(snapshotInfoTableCFHandle))) {
it.get().seekToFirst();
return !it.get().isValid();
}
Expand Down Expand Up @@ -499,7 +475,6 @@ public void onCompactionBegin(RocksDB db,
};
}


private AbstractEventListener newCompactionCompletedListener() {
return new AbstractEventListener() {
@Override
Expand Down Expand Up @@ -577,7 +552,7 @@ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
byte[] key = keyString.getBytes(UTF_8);
byte[] value = compactionLogEntry.getProtobuf().toByteArray();
try {
activeRocksDB.put(compactionLogTableCFHandle, key, value);
activeRocksDB.get().put(compactionLogTableCFHandle, key, value);
} catch (RocksDBException exception) {
// TODO: Revisit exception handling before merging the PR.
throw new RuntimeException(exception);
Expand Down Expand Up @@ -631,11 +606,11 @@ private long getSSTFileSummary(String filename)
}

try (ManagedOptions option = new ManagedOptions();
ManagedSstFileReader reader = ManagedSstFileReader.managed(new SstFileReader(option))) {
ManagedSstFileReader reader = new ManagedSstFileReader(option)) {

reader.get().open(getAbsoluteSstFilePath(filename));
reader.open(getAbsoluteSstFilePath(filename));

TableProperties properties = reader.get().getTableProperties();
TableProperties properties = reader.getTableProperties();
if (LOG.isDebugEnabled()) {
LOG.debug("{} has {} keys", filename, properties.getNumEntries());
}
Expand Down Expand Up @@ -801,7 +776,7 @@ public void loadAllCompactionLogs() {
preconditionChecksForLoadAllCompactionLogs();
addEntriesFromLogFilesToDagAndCompactionLogTable();
try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
activeRocksDB.newIterator(compactionLogTableCFHandle))) {
activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
managedRocksIterator.get().seekToFirst();
while (managedRocksIterator.get().isValid()) {
byte[] value = managedRocksIterator.get().value();
Expand Down Expand Up @@ -1252,7 +1227,7 @@ private synchronized Pair<Set<String>, List<byte[]>> getOlderFileNodes() {
List<byte[]> keysToRemove = new ArrayList<>();

try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
activeRocksDB.newIterator(compactionLogTableCFHandle))) {
activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
managedRocksIterator.get().seekToFirst();
while (managedRocksIterator.get().isValid()) {
CompactionLogEntry compactionLogEntry = CompactionLogEntry
Expand Down Expand Up @@ -1282,7 +1257,7 @@ private synchronized void removeKeyFromCompactionLogTable(
List<byte[]> keysToRemove) {
try {
for (byte[] key: keysToRemove) {
activeRocksDB.delete(compactionLogTableCFHandle, key);
activeRocksDB.get().delete(compactionLogTableCFHandle, key);
}
} catch (RocksDBException exception) {
// TODO Handle exception properly before merging the PR.
Expand Down Expand Up @@ -1575,11 +1550,11 @@ private CompactionFileInfo toFileInfo(String sstFile,
CompactionFileInfo.Builder fileInfoBuilder =
new CompactionFileInfo.Builder(fileName);

try (ManagedSstFileReader fileReader = ManagedSstFileReader.managed(new SstFileReader(options))) {
fileReader.get().open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.get().getTableProperties().getColumnFamilyName());
try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName());
try (ManagedSstFileReaderIterator iterator =
ManagedSstFileReaderIterator.managed(fileReader.get().newIterator(readOptions))) {
ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) {
iterator.get().seekToFirst();
String startKey = StringUtils.bytes2String(iterator.get().key());
iterator.get().seekToLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.rocksdb.SstFileReader;
import org.rocksdb.TableProperties;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,17 +89,17 @@ public static boolean doesSstFileContainKeyRange(String filepath,

try (
ManagedOptions options = new ManagedOptions();
ManagedSstFileReader sstFileReader = ManagedSstFileReader.managed(new SstFileReader(options))) {
sstFileReader.get().open(filepath);
TableProperties properties = sstFileReader.get().getTableProperties();
ManagedSstFileReader sstFileReader = new ManagedSstFileReader(options)) {
sstFileReader.open(filepath);
TableProperties properties = sstFileReader.getTableProperties();
String tableName = new String(properties.getColumnFamilyName(), UTF_8);
if (tableToPrefixMap.containsKey(tableName)) {
String prefix = tableToPrefixMap.get(tableName);

try (
ManagedReadOptions readOptions = new ManagedReadOptions();
ManagedSstFileReaderIterator iterator = ManagedSstFileReaderIterator.managed(
sstFileReader.get().newIterator(readOptions))) {
sstFileReader.newIterator(readOptions))) {
iterator.get().seek(prefix.getBytes(UTF_8));
String seekResultKey = new String(iterator.get().key(), UTF_8);
return seekResultKey.startsWith(prefix);
Expand Down
Loading