Skip to content

Commit

Permalink
HDDS-9039. Removed the pause and wait in RocksDB compaction when tarb…
Browse files Browse the repository at this point in the history
…all creation is in progress (apache#6552)
  • Loading branch information
hemantk-12 authored May 13, 2024
1 parent c5eb2ac commit b11b807
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -170,7 +169,6 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
= new BootstrapStateHandler.Lock();

private ColumnFamilyHandle snapshotInfoTableCFHandle;
private final AtomicInteger tarballRequestCount;
private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService";
private AtomicBoolean suspended;

Expand Down Expand Up @@ -247,7 +245,6 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
} else {
this.scheduler = null;
}
this.tarballRequestCount = new AtomicInteger(0);
}

private String createCompactionLogDir(String metadataDirName,
Expand Down Expand Up @@ -517,8 +514,6 @@ public void onCompactionCompleted(RocksDB db,
return;
}

waitForTarballCreation();

// Add the compaction log entry to Compaction log table.
addToCompactionLogTable(compactionLogEntry);

Expand Down Expand Up @@ -559,22 +554,6 @@ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
}
}

/**
* Check if there is any in_progress tarball creation request and wait till
* all tarball creation finish, and it gets notified.
*/
private void waitForTarballCreation() {
while (tarballRequestCount.get() != 0) {
try {
wait(Integer.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Compaction log thread {} is interrupted.",
Thread.currentThread().getName());
}
}
}

/**
* Creates a hard link between provided link and source.
* It doesn't throw any exception if {@link Files#createLink} throws
Expand Down Expand Up @@ -1424,28 +1403,10 @@ public void pruneSstFiles() {
}
}

public void incrementTarballRequestCount() {
tarballRequestCount.incrementAndGet();
}

public void decrementTarballRequestCountAndNotify() {
// Synchronized block is used to ensure that lock is on the same instance notifyAll is being called.
synchronized (this) {
tarballRequestCount.decrementAndGet();
// Notify compaction threads to continue.
notifyAll();
}
}

public boolean shouldRun() {
return !suspended.get();
}

@VisibleForTesting
public int getTarballRequestCount() {
return tarballRequestCount.get();
}

@VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,8 @@ public void testWriteDbDataToStream() throws Exception {
Path expectedLog = Paths.get(compactionLogDir, "expected" +
COMPACTION_LOG_FILE_NAME_SUFFIX);
String expectedLogStr = truncateFileName(metaDirLength, expectedLog);
Path unExpectedLog = Paths.get(compactionLogDir, "unexpected" +
COMPACTION_LOG_FILE_NAME_SUFFIX);
String unExpectedLogStr = truncateFileName(metaDirLength, unExpectedLog);
Path expectedSst = Paths.get(sstBackupDir, "expected.sst");
String expectedSstStr = truncateFileName(metaDirLength, expectedSst);
Path unExpectedSst = Paths.get(sstBackupDir, "unexpected.sst");
String unExpectedSstStr = truncateFileName(metaDirLength, unExpectedSst);

// put "expected" fabricated files onto the fs before the files get
// copied to the temp dir.
Expand All @@ -436,15 +431,6 @@ public void testWriteDbDataToStream() throws Exception {
// with the snapshot data.
doNothing().when(checkpoint).cleanupCheckpoint();
realCheckpoint.set(checkpoint);

// put "unexpected" fabricated files onto the fs after the files
// get copied to the temp dir. Since these appear in the "real"
// dir after the copy, they shouldn't exist in the final file
// set. That will show that the copy only happened from the temp dir.
Files.write(unExpectedLog,
"fabricatedData".getBytes(StandardCharsets.UTF_8));
Files.write(unExpectedSst,
"fabricatedData".getBytes(StandardCharsets.UTF_8));
return checkpoint;
});

Expand All @@ -460,10 +446,6 @@ public void testWriteDbDataToStream() throws Exception {
long tmpHardLinkFileCount = tmpHardLinkFileCount();
omDbCheckpointServletMock.doGet(requestMock, responseMock);
assertEquals(tmpHardLinkFileCount, tmpHardLinkFileCount());

// Verify that tarball request count reaches to zero once doGet completes.
assertEquals(0,
dbStore.getRocksDBCheckpointDiffer().getTarballRequestCount());
dbCheckpoint = realCheckpoint.get();

// Untar the file into a temp folder to be examined.
Expand Down Expand Up @@ -528,15 +510,7 @@ public void testWriteDbDataToStream() throws Exception {
getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), metaDirLength);
assertThat(finalFullSet).contains(expectedLogStr);
assertThat(finalFullSet).contains(expectedSstStr);
assertThat(initialFullSet).contains(unExpectedLogStr);
assertThat(initialFullSet).contains(unExpectedSstStr);

// Remove the dummy files that should not have been copied over
// from the expected data.
initialFullSet.remove(unExpectedLogStr);
initialFullSet.remove(unExpectedSstStr);
assertEquals(initialFullSet, finalFullSet,
"expected snapshot files not found");
assertEquals(initialFullSet, finalFullSet, "expected snapshot files not found");
}

private static long tmpHardLinkFileCount() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,43 +222,24 @@ public static Map<Path, Path> normalizeExcludeList(
}

/**
* Pauses rocksdb compaction threads while creating copies of
* compaction logs and hard links of sst backups.
* Copies compaction logs and hard links of sst backups to tmpDir.
* @param tmpdir - Place to create copies/links
* @param flush - Whether to flush the db or not.
* @return Checkpoint containing snapshot entries expected.
*/
@Override
public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
throws IOException {
DBCheckpoint checkpoint;

public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) throws IOException {
// make tmp directories to contain the copies
RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
DirectoryData sstBackupDir = new DirectoryData(tmpdir,
differ.getSSTBackupDir());
DirectoryData compactionLogDir = new DirectoryData(tmpdir,
differ.getCompactionLogDir());

long startTime = System.currentTimeMillis();
long pauseCounter = PAUSE_COUNTER.incrementAndGet();

try {
LOG.info("Compaction pausing {} started.", pauseCounter);
// Pause compactions, Copy/link files and get checkpoint.
differ.incrementTarballRequestCount();
FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
compactionLogDir.getTmpDir());
OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(),
sstBackupDir.getTmpDir());
checkpoint = getDbStore().getCheckpoint(flush);
} finally {
// Unpause the compaction threads.
differ.decrementTarballRequestCountAndNotify();
long elapsedTime = System.currentTimeMillis() - startTime;
LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter, elapsedTime);
}
return checkpoint;
DirectoryData sstBackupDir = new DirectoryData(tmpdir, differ.getSSTBackupDir());
DirectoryData compactionLogDir = new DirectoryData(tmpdir, differ.getCompactionLogDir());

// Create checkpoint and then copy the files so that it has all the compaction entries and files.
DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
FileUtils.copyDirectory(compactionLogDir.getOriginalDir(), compactionLogDir.getTmpDir());
OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(), sstBackupDir.getTmpDir());

return dbCheckpoint;
}


Expand Down

0 comments on commit b11b807

Please sign in to comment.