Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11914. Snapshot diff should not filter SST Files based by reading SST file reader #7563

Merged
merged 9 commits into from
Dec 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.commons.io.FilenameUtils;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
Expand All @@ -31,6 +32,8 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Managed {@link RocksDB}.
Expand Down Expand Up @@ -102,4 +105,14 @@ public void deleteFile(LiveFileMetaData fileToBeDeleted)
File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName());
ManagedRocksObjectUtils.waitForFileDelete(file, Duration.ofSeconds(60));
}

public static Map<String, LiveFileMetaData> getLiveMetadataForSSTFiles(RocksDB db) {
return db.getLiveFilesMetaData().stream().collect(
Collectors.toMap(liveFileMetaData -> FilenameUtils.getBaseName(liveFileMetaData.fileName()),
liveFileMetaData -> liveFileMetaData));
}

public Map<String, LiveFileMetaData> getLiveMetadataForSSTFiles() {
return getLiveMetadataForSSTFiles(this.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.rocksdb.LiveFileMetaData;

import java.util.Objects;

Expand Down Expand Up @@ -128,6 +130,16 @@ public Builder setColumnFamily(String columnFamily) {
return this;
}

public Builder setValues(LiveFileMetaData fileMetaData) {
if (fileMetaData != null) {
String columnFamilyName = StringUtils.bytes2String(fileMetaData.columnFamilyName());
String startRangeValue = StringUtils.bytes2String(fileMetaData.smallestKey());
String endRangeValue = StringUtils.bytes2String(fileMetaData.largestKey());
this.setColumnFamily(columnFamilyName).setStartRange(startRangeValue).setEndRange(endRangeValue);
}
return this;
}

public CompactionFileInfo build() {
if ((startRange != null || endRange != null || columnFamily != null) &&
(startRange == null || endRange == null || columnFamily == null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.ozone.rocksdiff;

import org.apache.ozone.compaction.log.CompactionFileInfo;

/**
* Node in the compaction DAG that represents an SST file.
*/
Expand Down Expand Up @@ -48,6 +50,11 @@ public CompactionNode(String file, long numKeys, long seqNum,
this.columnFamily = columnFamily;
}

public CompactionNode(CompactionFileInfo compactionFileInfo) {
this(compactionFileInfo.getFileName(), -1, -1, compactionFileInfo.getStartKey(),
compactionFileInfo.getEndKey(), compactionFileInfo.getColumnFamily());
}

@Override
public String toString() {
return String.format("Node{%s}", fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,6 +35,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand All @@ -42,11 +44,9 @@
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
Expand Down Expand Up @@ -74,7 +74,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -174,6 +173,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,

private ColumnFamilyHandle compactionLogTableCFHandle;
private ManagedRocksDB activeRocksDB;
private ConcurrentMap<String, CompactionFileInfo> inflightCompactions;

/**
* For snapshot diff calculation we only need to track following column
Expand Down Expand Up @@ -245,6 +245,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
} else {
this.scheduler = null;
}
this.inflightCompactions = new ConcurrentHashMap<>();
}

private String createCompactionLogDir(String metadataDirName,
Expand Down Expand Up @@ -463,7 +464,7 @@ public void onCompactionBegin(RocksDB db,
return;
}
}

inflightCompactions.putAll(toFileInfoList(compactionJobInfo.inputFiles(), db));
for (String file : compactionJobInfo.inputFiles()) {
createLink(Paths.get(sstBackupDir, new File(file).getName()),
Paths.get(file));
Expand All @@ -484,25 +485,28 @@ public void onCompactionCompleted(RocksDB db,
}

long trxId = db.getLatestSequenceNumber();

Map<String, CompactionFileInfo> inputFileCompactions = toFileInfoList(compactionJobInfo.inputFiles(), db);
CompactionLogEntry.Builder builder;
try (ManagedOptions options = new ManagedOptions();
ManagedReadOptions readOptions = new ManagedReadOptions()) {
builder = new CompactionLogEntry.Builder(trxId,
System.currentTimeMillis(),
toFileInfoList(compactionJobInfo.inputFiles(), options,
readOptions),
toFileInfoList(compactionJobInfo.outputFiles(), options,
readOptions));
}
builder = new CompactionLogEntry.Builder(trxId,
System.currentTimeMillis(),
inputFileCompactions.keySet().stream()
.map(inputFile -> {
if (!inflightCompactions.containsKey(inputFile)) {
LOG.warn("Input file not found in inflightCompactionsMap : {} which should have been added on " +
"compactionBeginListener.",
inputFile);
}
return inflightCompactions.getOrDefault(inputFile, inputFileCompactions.get(inputFile));
})
.collect(Collectors.toList()),
new ArrayList<>(toFileInfoList(compactionJobInfo.outputFiles(), db).values()));

if (LOG.isDebugEnabled()) {
builder = builder.setCompactionReason(
compactionJobInfo.compactionReason().toString());
}

CompactionLogEntry compactionLogEntry = builder.build();

synchronized (this) {
if (closed) {
return;
Expand All @@ -521,6 +525,9 @@ public void onCompactionCompleted(RocksDB db,
populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
compactionLogEntry.getOutputFileInfoList(),
compactionLogEntry.getDbSequenceNumber());
for (String inputFile : inputFileCompactions.keySet()) {
inflightCompactions.remove(inputFile);
}
}
}
};
Expand Down Expand Up @@ -789,7 +796,7 @@ private void preconditionChecksForLoadAllCompactionLogs() {
* and appends the extension '.sst'.
*/
private String getSSTFullPath(String sstFilenameWithoutExtension,
String dbPath) {
String... dbPaths) {

// Try to locate the SST in the backup dir first
final Path sstPathInBackupDir = Paths.get(sstBackupDir,
Expand All @@ -800,11 +807,13 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,

// SST file does not exist in the SST backup dir, this means the SST file
// has not gone through any compactions yet and is only available in the
// src DB directory
final Path sstPathInDBDir = Paths.get(dbPath,
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
if (Files.exists(sstPathInDBDir)) {
return sstPathInDBDir.toString();
// src DB directory or destDB directory
for (String dbPath : dbPaths) {
final Path sstPathInDBDir = Paths.get(dbPath,
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
if (Files.exists(sstPathInDBDir)) {
return sstPathInDBDir.toString();
}
}

// TODO: More graceful error handling?
Expand All @@ -825,18 +834,16 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized List<String> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {
public synchronized List<String> getSSTDiffListWithFullPath(DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob) {

List<String> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
String sstFullPath = getSSTFullPath(sst, src.getDbPath(), dest.getDbPath());
Path link = Paths.get(sstFilesDirForSnapDiffJob,
sst + SST_FILE_EXTENSION);
Path srcFile = Paths.get(sstFullPath);
Expand All @@ -858,9 +865,7 @@ public synchronized List<String> getSSTDiffListWithFullPath(
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized List<String> getSSTDiffList(
DifferSnapshotInfo src,
DifferSnapshotInfo dest
) throws IOException {
DifferSnapshotInfo src, DifferSnapshotInfo dest) {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Expand Down Expand Up @@ -895,30 +900,13 @@ public synchronized List<String> getSSTDiffList(
}

if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
filterRelevantSstFilesFullPath(fwdDAGDifferentFiles,
src.getTablePrefixes());
RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes(), compactionNodeMap,
src.getRocksDB(), dest.getRocksDB());
}

return new ArrayList<>(fwdDAGDifferentFiles);
}

/**
* construct absolute sst file path first and
* filter the files.
*/
public void filterRelevantSstFilesFullPath(Set<String> inputFiles,
Map<String, String> tableToPrefixMap) throws IOException {
for (Iterator<String> fileIterator =
inputFiles.iterator(); fileIterator.hasNext();) {
String filename = fileIterator.next();
String filepath = getAbsoluteSstFilePath(filename);
if (!RocksDiffUtils.doesSstFileContainKeyRange(filepath,
tableToPrefixMap)) {
fileIterator.remove();
}
}
}

/**
* Core getSSTDiffList logic.
* <p>
Expand Down Expand Up @@ -1485,60 +1473,24 @@ public void pngPrintMutableGraph(String filePath, GraphType graphType)
graph.generateImage(filePath);
}

private List<CompactionFileInfo> toFileInfoList(List<String> sstFiles,
ManagedOptions options,
ManagedReadOptions readOptions
) {
private Map<String, CompactionFileInfo> toFileInfoList(List<String> sstFiles, RocksDB db) {
if (CollectionUtils.isEmpty(sstFiles)) {
return Collections.emptyList();
return Collections.emptyMap();
}

List<CompactionFileInfo> response = new ArrayList<>();

Map<String, LiveFileMetaData> liveFileMetaDataMap = ManagedRocksDB.getLiveMetadataForSSTFiles(db);
Map<String, CompactionFileInfo> response = new HashMap<>();
for (String sstFile : sstFiles) {
CompactionFileInfo fileInfo = toFileInfo(sstFile, options, readOptions);
response.add(fileInfo);
String fileName = FilenameUtils.getBaseName(sstFile);
CompactionFileInfo fileInfo =
new CompactionFileInfo.Builder(fileName).setValues(liveFileMetaDataMap.get(fileName)).build();
response.put(sstFile, fileInfo);
}
return response;
}

private CompactionFileInfo toFileInfo(String sstFile,
ManagedOptions options,
ManagedReadOptions readOptions) {
final int fileNameOffset = sstFile.lastIndexOf("/") + 1;
String fileName = sstFile.substring(fileNameOffset,
sstFile.length() - SST_FILE_EXTENSION_LENGTH);
CompactionFileInfo.Builder fileInfoBuilder =
new CompactionFileInfo.Builder(fileName);

try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName());
try (ManagedSstFileReaderIterator iterator =
ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) {
iterator.get().seekToFirst();
String startKey = StringUtils.bytes2String(iterator.get().key());
iterator.get().seekToLast();
String endKey = StringUtils.bytes2String(iterator.get().key());
fileInfoBuilder.setStartRange(startKey)
.setEndRange(endKey)
.setColumnFamily(columnFamily);
}
} catch (RocksDBException rocksDBException) {
// Ideally it should not happen. If it does just log the exception.
// And let the compaction complete without the exception.
// Throwing exception in compaction listener could fail the RocksDB.
// In case of exception, compaction node will be missing start key,
// end key and column family. And during diff calculation it will
// continue the traversal as it was before HDDS-8940.
LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException);
}
return fileInfoBuilder.build();
}

@VisibleForTesting
boolean shouldSkipNode(CompactionNode node,
Map<String, String> columnFamilyToPrefixMap) {
static boolean shouldSkipNode(CompactionNode node,
Map<String, String> columnFamilyToPrefixMap) {
// This is for backward compatibility. Before the compaction log table
// migration, startKey, endKey and columnFamily information is not persisted
// in compaction log files.
Expand Down Expand Up @@ -1567,4 +1519,8 @@ boolean shouldSkipNode(CompactionNode node,
return !RocksDiffUtils.isKeyWithPrefixPresent(keyPrefix, node.getStartKey(),
node.getEndKey());
}

ConcurrentMap<String, CompactionFileInfo> getInflightCompactions() {
return inflightCompactions;
}
}
Loading
Loading