Skip to content

Commit

Permalink
Addressed review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
hemantk-12 committed Nov 10, 2023
1 parent 9e040b2 commit 9e7a847
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
private ColumnFamilyHandle compactionLogTableCFHandle;
private RocksDB activeRocksDB;

/**
* For snapshot diff calculation we only need to track.
*/
public static final Set<String> COLUMN_FAMILIES_TO_TRACK_IN_DAG =
ImmutableSet.of("keyTable", "directoryTable", "fileTable");
/**
Expand Down Expand Up @@ -442,6 +445,15 @@ private AbstractEventListener newCompactionBeginListener() {
@Override
public void onCompactionBegin(RocksDB db,
CompactionJobInfo compactionJobInfo) {
String columnFamily =
StringUtils.bytes2String(compactionJobInfo.columnFamilyName());

if (!COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(columnFamily)) {
LOG.debug("Skipping compaction begin for columnFamily: {}",
columnFamily);
return;
}

if (compactionJobInfo.inputFiles().size() == 0) {
LOG.error("Compaction input files list is empty");
return;
Expand All @@ -458,38 +470,29 @@ public void onCompactionBegin(RocksDB db,
return;
}
}
createHardLinks(compactionJobInfo.inputFiles());
}
};
}

@VisibleForTesting
void createHardLinks(List<String> sourceFiles) {
try (ManagedOptions options = new ManagedOptions();
ManagedReadOptions readOptions = new ManagedReadOptions()) {
// Create hardlink backups for the SST files that are going
// to be deleted after this RDB compaction.
for (String file : sourceFiles) {
CompactionFileInfo fileInfo =
toFileInfo(file, options, readOptions);
if (shouldSkipFile(fileInfo)) {
LOG.debug("Skipping hard link for sst file '{}' belongs to " +
"column family '{}'.", file, fileInfo.getColumnFamily());
} else {
LOG.debug("Creating hard link for sst file '{}' belongs to " +
"column family '{}'", file, fileInfo.getColumnFamily());
for (String file : compactionJobInfo.inputFiles()) {
createLink(Paths.get(sstBackupDir, new File(file).getName()),
Paths.get(file));
}
}
}
};
}


private AbstractEventListener newCompactionCompletedListener() {
return new AbstractEventListener() {
@Override
public void onCompactionCompleted(RocksDB db,
CompactionJobInfo compactionJobInfo) {
String columnFamily =
StringUtils.bytes2String(compactionJobInfo.columnFamilyName());

if (!COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(columnFamily)) {
LOG.debug("Skipping compaction completed for columnFamily: {}",
columnFamily);
return;
}

if (compactionJobInfo.inputFiles().isEmpty()) {
LOG.error("Compaction input files list is empty");
Expand Down Expand Up @@ -1533,22 +1536,6 @@ public void pngPrintMutableGraph(String filePath, GraphType graphType)
graph.generateImage(filePath);
}

/**
* Returns true is compactionFileInfo has column family information and
* doesn't belong to any of the COLUMN_FAMILIES_TO_TRACK_IN_DAG.
* Otherwise, false and should be added compaction DAG.
* CompactionFileInfo doesn't have column family information in two cases,
* i). Backward compatibility. Before HDDS-8940, column family is not
* persisted.
* ii). In case, it fails to read SST in {@link this.toFileInfo()}.
*/
@VisibleForTesting
boolean shouldSkipFile(CompactionFileInfo compactionFileInfo) {
return Objects.nonNull(compactionFileInfo.getColumnFamily()) &&
!COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(
compactionFileInfo.getColumnFamily());
}

private List<CompactionFileInfo> toFileInfoList(List<String> sstFiles,
ManagedOptions options,
ManagedReadOptions readOptions
Expand All @@ -1561,14 +1548,7 @@ private List<CompactionFileInfo> toFileInfoList(List<String> sstFiles,

for (String sstFile : sstFiles) {
CompactionFileInfo fileInfo = toFileInfo(sstFile, options, readOptions);
if (shouldSkipFile(fileInfo)) {
LOG.debug("Skipping sst file: '{}' belongs to column family '{}'.",
sstFile, fileInfo.getColumnFamily());
} else {
LOG.debug("Adding sst file: '{}' belongs to column family '{}' to DAG.",
sstFile, fileInfo.getColumnFamily());
response.add(fileInfo);
}
response.add(fileInfo);
}
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
Expand All @@ -82,7 +83,7 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.SstFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
Expand Down Expand Up @@ -1829,67 +1830,6 @@ public void testShouldSkipNodeEdgeCase(
columnFamilyToPrefixMap));
}

private static Stream<Arguments> shouldSkipFileCases() {
return Stream.of(
Arguments.of("volumeTable", true),
Arguments.of("bucketTable", true),
Arguments.of("keyTable", false),
Arguments.of("directoryTable", false),
Arguments.of("fileTable", false),
Arguments.of("snapshotInfoTable", true),
Arguments.of("compactionLogTable", true),
Arguments.of(null, false)); // case when failed to read SST
}

@MethodSource("shouldSkipFileCases")
@ParameterizedTest
public void testShouldSkipFile(String columnFamily,
boolean expectedResult) {
CompactionFileInfo fileInfo =
new CompactionFileInfo("fileName", "startKey", "endKey", columnFamily);
assertEquals(expectedResult,
rocksDBCheckpointDiffer.shouldSkipFile(fileInfo));
}

// End-to-end to verify that only 'keyTable', 'directoryTable' and
// 'fileTable' column families SST files are hard linked to SST back-up dir.
@Test
public void testCreateHardLinks() throws RocksDBException, IOException {
// To skip 'isSnapshotInfoTableEmpty' check in 'newCompactionBeginListener'.
rocksDBCheckpointDiffer.setSnapshotInfoTableCFHandle(
Mockito.mock(ColumnFamilyHandle.class));
createKeys(keyTableCFHandle, "keyName-", "keyValue-", 100);
createKeys(directoryTableCFHandle, "dirName-", "dirValue-", 10);
createKeys(fileTableCFHandle, "fileName-", "fileValue-", 100);
createKeys(compactionLogTableCFHandle, "logName-", "logValue-", 10);

List<LiveFileMetaData> liveFilesMetaData =
activeRocksDB.getLiveFilesMetaData();

List<String> expectedFilesInBackDir = liveFilesMetaData.stream()
.filter(file -> COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(
new String(file.columnFamilyName(), UTF_8)))
.map(SstFileMetaData::fileName)
.sorted()
.collect(Collectors.toList());

List<String> sstFilesInActiveDB = liveFilesMetaData
.stream()
.map(file -> file.path() + file.fileName())
.collect(Collectors.toList());

rocksDBCheckpointDiffer.createHardLinks(sstFilesInActiveDB);
List<String> actualFilesInBackDir;
try (Stream<Path> pathStream = Files.list(
Paths.get(rocksDBCheckpointDiffer.getSSTBackupDir()))) {
actualFilesInBackDir = pathStream
.map(file -> "/" + file.getFileName().toString())
.sorted().collect(Collectors.toList());
}

assertEquals(expectedFilesInBackDir, actualFilesInBackDir);
}

private void createKeys(ColumnFamilyHandle cfh,
String keyPrefix,
String valuePrefix,
Expand All @@ -1910,7 +1850,8 @@ private void createKeys(ColumnFamilyHandle cfh,
// End-to-end to verify that only 'keyTable', 'directoryTable'
// and 'fileTable' column families SST files are added to compaction DAG.
@Test
public void testDag() throws RocksDBException {
public void testDagOnlyContainsDesiredCfh()
throws RocksDBException, IOException {
// Setting is not non-empty table so that 'isSnapshotInfoTableEmpty'
// returns true.
rocksDBCheckpointDiffer.setSnapshotInfoTableCFHandle(keyTableCFHandle);
Expand All @@ -1931,5 +1872,22 @@ public void testDag() throws RocksDBException {
// CompactionNodeMap should not contain any node other than 'keyTable',
// 'directoryTable' and 'fileTable' column families nodes.
assertTrue(compactionNodes.isEmpty());

// Assert that only 'keyTable', 'directoryTable' and 'fileTable'
// column families SST files are backed-up.
try (ManagedOptions options = new ManagedOptions();
Stream<Path> pathStream = Files.list(
Paths.get(rocksDBCheckpointDiffer.getSSTBackupDir()))) {
pathStream.forEach(path -> {
try (SstFileReader fileReader = new SstFileReader(options)) {
fileReader.open(path.toAbsolutePath().toString());
String columnFamily = StringUtils.bytes2String(
fileReader.getTableProperties().getColumnFamilyName());
assertTrue(COLUMN_FAMILIES_TO_TRACK_IN_DAG.contains(columnFamily));
} catch (RocksDBException rocksDBException) {
fail("Failed to read file: " + path.toAbsolutePath());
}
});
}
}
}

0 comments on commit 9e7a847

Please sign in to comment.