Skip to content

Commit

Permalink
HDDS-11893. Fix full snapshot diff fallback logic because of DAG prun…
Browse files Browse the repository at this point in the history
…ing (#7549)
  • Loading branch information
swamirishi authored Dec 12, 2024
1 parent b5d04e2 commit 853d657
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -640,7 +641,7 @@ private String trimSSTFilename(String filename) {
* @param rocksDB open rocksDB instance.
* @return a list of SST files (without extension) in the DB.
*/
public HashSet<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
HashSet<String> liveFiles = new HashSet<>();

final List<String> cfs = Arrays.asList(
Expand Down Expand Up @@ -825,15 +826,15 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized List<String> getSSTDiffListWithFullPath(
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {

List<String> sstDiffList = getSSTDiffList(src, dest);
Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
return sstDiffList.map(diffList -> diffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
Expand All @@ -843,7 +844,7 @@ public synchronized List<String> getSSTDiffListWithFullPath(
createLink(link, srcFile);
return link.toString();
})
.collect(Collectors.toList());
.collect(Collectors.toList()));
}

/**
Expand All @@ -857,10 +858,8 @@ public synchronized List<String> getSSTDiffListWithFullPath(
* @param dest destination snapshot
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized List<String> getSSTDiffList(
DifferSnapshotInfo src,
DifferSnapshotInfo dest
) throws IOException {
public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest) throws IOException {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Expand Down Expand Up @@ -894,12 +893,18 @@ public synchronized List<String> getSSTDiffList(
LOG.debug("{}", logSB);
}

// Check if the DAG traversal was able to reach all the destination SST files.
for (String destSnapFile : destSnapFiles) {
if (!fwdDAGSameFiles.contains(destSnapFile) && !fwdDAGDifferentFiles.contains(destSnapFile)) {
return Optional.empty();
}
}

if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
filterRelevantSstFilesFullPath(fwdDAGDifferentFiles,
src.getTablePrefixes());
}

return new ArrayList<>(fwdDAGDifferentFiles);
return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -67,14 +68,18 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.LiveFileMetaData;
Expand All @@ -101,6 +106,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -231,6 +237,29 @@ public void cleanUp() {
}
}

private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean prune) {
List<CompactionLogEntry> entries = new ArrayList<>();
if (!prune) {
entries.add(createCompactionEntry(1,
now(),
Arrays.asList("1", "2"),
Arrays.asList("4", "5")));
}
entries.addAll(Arrays.asList(createCompactionEntry(2,
now(),
Arrays.asList("4", "5"),
Collections.singletonList("10")),
createCompactionEntry(3,
now(),
Arrays.asList("3", "13", "14"),
Arrays.asList("6", "7")),
createCompactionEntry(4,
now(),
Arrays.asList("6", "7"),
Collections.singletonList("11"))));
return entries;
}

/**
* Test cases for testGetSSTDiffListWithoutDB.
*/
Expand Down Expand Up @@ -306,13 +335,17 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
);

DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
"/path/to/dbcp1", UUID.randomUUID(), 3008L, null, null);
"/path/to/dbcp1", UUID.randomUUID(), 3008L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 14980L, null, null);
"/path/to/dbcp2", UUID.randomUUID(), 14980L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
"/path/to/dbcp3", UUID.randomUUID(), 17975L, null, null);
"/path/to/dbcp3", UUID.randomUUID(), 17975L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
"/path/to/dbcp4", UUID.randomUUID(), 18000L, null, null);
"/path/to/dbcp4", UUID.randomUUID(), 18000L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 0L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 100L, null, Mockito.mock(ManagedRocksDB.class));

Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
Expand Down Expand Up @@ -340,6 +373,8 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
false),
Expand All @@ -354,6 +389,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
false),
Arguments.of("Test 3: Compaction log file crafted input: " +
"Same SST files found during SST expansion",
Expand All @@ -365,6 +401,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt1,
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
false),
Arguments.of("Test 4: Compaction log file crafted input: " +
"Skipping known processed SST.",
Expand All @@ -376,6 +413,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt2,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
true),
Arguments.of("Test 5: Compaction log file hit snapshot" +
" generation early exit condition",
Expand All @@ -387,6 +425,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
false),
Arguments.of("Test 6: Compaction log table regular case. " +
"Expands expandable SSTs in the initial diff.",
Expand All @@ -397,6 +436,8 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
false),
Expand All @@ -411,6 +452,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
false),
Arguments.of("Test 8: Compaction log table crafted input: " +
"Same SST files found during SST expansion",
Expand All @@ -422,6 +464,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt1,
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
false),
Arguments.of("Test 9: Compaction log table crafted input: " +
"Skipping known processed SST.",
Expand All @@ -433,6 +476,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt2,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
true),
Arguments.of("Test 10: Compaction log table hit snapshot " +
"generation early exit condition",
Expand All @@ -444,7 +488,33 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
false),
Arguments.of("Test 11: Older Compaction log got pruned and source snapshot delta files would be " +
"unreachable",
null,
getPrunedCompactionEntries(false),
snapshotInfo6,
snapshotInfo5,
ImmutableSet.of("10", "11", "8", "9", "12"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("2", "8", "9", "12"),
ImmutableSet.of("2", "8", "9", "12"),
false),
Arguments.of("Test 12: Older Compaction log got pruned and source snapshot delta files would be " +
"unreachable",
null,
getPrunedCompactionEntries(true),
snapshotInfo6,
snapshotInfo5,
ImmutableSet.of("10", "11", "8", "9", "12"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("3", "13", "14"),
ImmutableSet.of("4", "5", "8", "9", "12"),
null,
false)

);
}

Expand All @@ -464,10 +534,10 @@ public void testGetSSTDiffListWithoutDB(String description,
Set<String> destSnapshotSstFiles,
Set<String> expectedSameSstFiles,
Set<String> expectedDiffSstFiles,
boolean expectingException) {
Set<String> expectedSSTDiffFiles,
boolean expectingException) throws IOException {

boolean exceptionThrown = false;

if (compactionLog != null) {
// Construct DAG from compaction log input
Arrays.stream(compactionLog.split("\n")).forEach(
Expand Down Expand Up @@ -500,10 +570,41 @@ public void testGetSSTDiffListWithoutDB(String description,
}
}

if (expectingException && !exceptionThrown) {
fail("Expecting exception but none thrown.");
}

// Check same and different SST files result
assertEquals(expectedSameSstFiles, actualSameSstFiles);
assertEquals(expectedDiffSstFiles, actualDiffSstFiles);

try (MockedStatic<RdbUtil> mockedHandler = Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
RocksDB rocksDB = Mockito.mock(RocksDB.class);
Mockito.when(rocksDB.getName()).thenReturn("dummy");
Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
.thenAnswer(i -> {
Set<String> sstFiles = i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
: destSnapshotSstFiles;
return sstFiles.stream().map(fileName -> {
LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class);
Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + fileName + SST_FILE_EXTENSION);
return liveFileMetaData;
}).collect(Collectors.toList());
});
try {
Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
.map(files -> files.stream().sorted().collect(Collectors.toList())).orElse(null),
rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot)
.map(i -> i.stream().sorted().collect(Collectors.toList())).orElse(null));
} catch (RuntimeException rtEx) {
if (!expectingException) {
fail("Unexpected exception thrown in test.");
} else {
exceptionThrown = true;
}
}
}
if (expectingException && !exceptionThrown) {
fail("Expecting exception but none thrown.");
}
Expand Down Expand Up @@ -587,7 +688,7 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
int index = 0;
for (DifferSnapshotInfo snap : snapshots) {
// Returns a list of SST files to be fed into RocksDiff
List<String> sstDiffList = differ.getSSTDiffList(src, snap);
List<String> sstDiffList = differ.getSSTDiffList(src, snap).orElse(Collections.emptyList());
LOG.info("SST diff list from '{}' to '{}': {}",
src.getDbPath(), snap.getDbPath(), sstDiffList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -231,7 +232,7 @@ public void testDAGReconstruction()
final File checkpointSnap2 = new File(snap2.getDbPath());
GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);

List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
LOG.debug("Got diff list: {}", sstDiffList21);

// Delete 1000 keys, take a 3rd snapshot, and do another diff
Expand All @@ -250,13 +251,13 @@ public void testDAGReconstruction()
final File checkpointSnap3 = new File(snap3.getDbPath());
GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);

List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());

// snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());

// Same snapshot. Result should be empty list
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2);
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2).orElse(Collections.emptyList());
assertThat(sstDiffList22).isEmpty();
snapDB1.close();
snapDB2.close();
Expand All @@ -282,13 +283,13 @@ public void testDAGReconstruction()
volumeName, bucketName, "snap3",
((RDBStore) snapDB3.get()
.getMetadataManager().getStore()).getDb().getManagedRocksDb());
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList21, sstDiffList21Run2);

List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());
assertEquals(sstDiffList32, sstDiffList32Run2);

List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList31, sstDiffList31Run2);
snapDB1.close();
snapDB2.close();
Expand Down
Loading

0 comments on commit 853d657

Please sign in to comment.