Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11893. Fix full snapshot diff fallback logic because of DAG pruning #7549

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -640,7 +641,7 @@ private String trimSSTFilename(String filename) {
* @param rocksDB open rocksDB instance.
* @return a list of SST files (without extension) in the DB.
*/
public HashSet<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
HashSet<String> liveFiles = new HashSet<>();

final List<String> cfs = Arrays.asList(
Expand Down Expand Up @@ -825,15 +826,15 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized List<String> getSSTDiffListWithFullPath(
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {

List<String> sstDiffList = getSSTDiffList(src, dest);
Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
return sstDiffList.map(diffList -> diffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
Expand All @@ -843,7 +844,7 @@ public synchronized List<String> getSSTDiffListWithFullPath(
createLink(link, srcFile);
return link.toString();
})
.collect(Collectors.toList());
.collect(Collectors.toList()));
}

/**
Expand All @@ -857,10 +858,8 @@ public synchronized List<String> getSSTDiffListWithFullPath(
* @param dest destination snapshot
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized List<String> getSSTDiffList(
DifferSnapshotInfo src,
DifferSnapshotInfo dest
) throws IOException {
public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest) throws IOException {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Expand Down Expand Up @@ -894,12 +893,18 @@ public synchronized List<String> getSSTDiffList(
LOG.debug("{}", logSB);
}

// Check if the DAG traversal was able to reach all the destination SST files.
for (String destSnapFile : destSnapFiles) {
if (!fwdDAGSameFiles.contains(destSnapFile) && !fwdDAGDifferentFiles.contains(destSnapFile)) {
return Optional.empty();
}
}

if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
filterRelevantSstFilesFullPath(fwdDAGDifferentFiles,
src.getTablePrefixes());
}

return new ArrayList<>(fwdDAGDifferentFiles);
return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -67,14 +68,18 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.LiveFileMetaData;
Expand All @@ -101,6 +106,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -231,6 +237,29 @@ public void cleanUp() {
}
}

private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean prune) {
List<CompactionLogEntry> entries = new ArrayList<>();
if (!prune) {
entries.add(createCompactionEntry(1,
now(),
Arrays.asList("1", "2"),
Arrays.asList("4", "5")));
}
entries.addAll(Arrays.asList(createCompactionEntry(2,
now(),
Arrays.asList("4", "5"),
Collections.singletonList("10")),
createCompactionEntry(3,
now(),
Arrays.asList("3", "13", "14"),
Arrays.asList("6", "7")),
createCompactionEntry(4,
now(),
Arrays.asList("6", "7"),
Collections.singletonList("11"))));
return entries;
}

/**
* Test cases for testGetSSTDiffListWithoutDB.
*/
Expand Down Expand Up @@ -306,13 +335,17 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
);

DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
"/path/to/dbcp1", UUID.randomUUID(), 3008L, null, null);
"/path/to/dbcp1", UUID.randomUUID(), 3008L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 14980L, null, null);
"/path/to/dbcp2", UUID.randomUUID(), 14980L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
"/path/to/dbcp3", UUID.randomUUID(), 17975L, null, null);
"/path/to/dbcp3", UUID.randomUUID(), 17975L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
"/path/to/dbcp4", UUID.randomUUID(), 18000L, null, null);
"/path/to/dbcp4", UUID.randomUUID(), 18000L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 0L, null, Mockito.mock(ManagedRocksDB.class));
DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo(
"/path/to/dbcp2", UUID.randomUUID(), 100L, null, Mockito.mock(ManagedRocksDB.class));

Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
Expand Down Expand Up @@ -340,6 +373,8 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
false),
Expand All @@ -354,6 +389,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
false),
Arguments.of("Test 3: Compaction log file crafted input: " +
"Same SST files found during SST expansion",
Expand All @@ -365,6 +401,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt1,
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
false),
Arguments.of("Test 4: Compaction log file crafted input: " +
"Skipping known processed SST.",
Expand All @@ -376,6 +413,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt2,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
true),
Arguments.of("Test 5: Compaction log file hit snapshot" +
" generation early exit condition",
Expand All @@ -387,6 +425,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
false),
Arguments.of("Test 6: Compaction log table regular case. " +
"Expands expandable SSTs in the initial diff.",
Expand All @@ -397,6 +436,8 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
false),
Expand All @@ -411,6 +452,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles3,
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
false),
Arguments.of("Test 8: Compaction log table crafted input: " +
"Same SST files found during SST expansion",
Expand All @@ -422,6 +464,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt1,
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
false),
Arguments.of("Test 9: Compaction log table crafted input: " +
"Skipping known processed SST.",
Expand All @@ -433,6 +476,7 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1Alt2,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
true),
Arguments.of("Test 10: Compaction log table hit snapshot " +
"generation early exit condition",
Expand All @@ -444,7 +488,33 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB() {
snapshotSstFiles1,
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
false),
Arguments.of("Test 11: Older Compaction log got pruned and source snapshot delta files would be " +
"unreachable",
null,
getPrunedCompactionEntries(false),
snapshotInfo6,
snapshotInfo5,
ImmutableSet.of("10", "11", "8", "9", "12"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("2", "8", "9", "12"),
ImmutableSet.of("2", "8", "9", "12"),
false),
Arguments.of("Test 12: Older Compaction log got pruned and source snapshot delta files would be " +
"unreachable",
null,
getPrunedCompactionEntries(true),
snapshotInfo6,
snapshotInfo5,
ImmutableSet.of("10", "11", "8", "9", "12"),
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("3", "13", "14"),
ImmutableSet.of("4", "5", "8", "9", "12"),
null,
false)

);
}

Expand All @@ -464,10 +534,10 @@ public void testGetSSTDiffListWithoutDB(String description,
Set<String> destSnapshotSstFiles,
Set<String> expectedSameSstFiles,
Set<String> expectedDiffSstFiles,
boolean expectingException) {
Set<String> expectedSSTDiffFiles,
boolean expectingException) throws IOException {

boolean exceptionThrown = false;

if (compactionLog != null) {
// Construct DAG from compaction log input
Arrays.stream(compactionLog.split("\n")).forEach(
Expand Down Expand Up @@ -500,10 +570,41 @@ public void testGetSSTDiffListWithoutDB(String description,
}
}

if (expectingException && !exceptionThrown) {
fail("Expecting exception but none thrown.");
}

// Check same and different SST files result
assertEquals(expectedSameSstFiles, actualSameSstFiles);
assertEquals(expectedDiffSstFiles, actualDiffSstFiles);

try (MockedStatic<RdbUtil> mockedHandler = Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
RocksDB rocksDB = Mockito.mock(RocksDB.class);
Mockito.when(rocksDB.getName()).thenReturn("dummy");
Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
.thenAnswer(i -> {
Set<String> sstFiles = i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
: destSnapshotSstFiles;
return sstFiles.stream().map(fileName -> {
LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class);
Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + fileName + SST_FILE_EXTENSION);
return liveFileMetaData;
}).collect(Collectors.toList());
});
try {
Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
.map(files -> files.stream().sorted().collect(Collectors.toList())).orElse(null),
rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot)
.map(i -> i.stream().sorted().collect(Collectors.toList())).orElse(null));
} catch (RuntimeException rtEx) {
if (!expectingException) {
fail("Unexpected exception thrown in test.");
} else {
exceptionThrown = true;
}
}
}
if (expectingException && !exceptionThrown) {
fail("Expecting exception but none thrown.");
}
Expand Down Expand Up @@ -587,7 +688,7 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
int index = 0;
for (DifferSnapshotInfo snap : snapshots) {
// Returns a list of SST files to be fed into RocksDiff
List<String> sstDiffList = differ.getSSTDiffList(src, snap);
List<String> sstDiffList = differ.getSSTDiffList(src, snap).orElse(Collections.emptyList());
LOG.info("SST diff list from '{}' to '{}': {}",
src.getDbPath(), snap.getDbPath(), sstDiffList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -231,7 +232,7 @@ public void testDAGReconstruction()
final File checkpointSnap2 = new File(snap2.getDbPath());
GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);

List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
LOG.debug("Got diff list: {}", sstDiffList21);

// Delete 1000 keys, take a 3rd snapshot, and do another diff
Expand All @@ -250,13 +251,13 @@ public void testDAGReconstruction()
final File checkpointSnap3 = new File(snap3.getDbPath());
GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);

List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());

// snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());

// Same snapshot. Result should be empty list
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2);
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2).orElse(Collections.emptyList());
assertThat(sstDiffList22).isEmpty();
snapDB1.close();
snapDB2.close();
Expand All @@ -282,13 +283,13 @@ public void testDAGReconstruction()
volumeName, bucketName, "snap3",
((RDBStore) snapDB3.get()
.getMetadataManager().getStore()).getDb().getManagedRocksDb());
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList21, sstDiffList21Run2);

List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());
assertEquals(sstDiffList32, sstDiffList32Run2);

List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList31, sstDiffList31Run2);
snapDB1.close();
snapDB2.close();
Expand Down
Loading
Loading