Skip to content

Commit

Permalink
Modified approach v2
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Feb 5, 2024
1 parent 2073f7f commit 720e6a8
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,8 @@ private static void deleteRange(
}
}

/**
* Clips and compacts the given database to the given key-group range. Any entries outside this
* range will be completely deleted (including tombstones).
*
* @param db the target need to be clipped.
* @param columnFamilyHandles the column family need to be clipped.
* @param keyGroupPrefixBytes the number of bytes required to represent all key-groups under the
* current max parallelism.
* @param dbExpectedKeyGroupRange the key-group range the rocksdb instance.
*/
public static void compactSstFilesToExpectedRange(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
int keyGroupPrefixBytes,
KeyGroupRange dbExpectedKeyGroupRange)
throws Exception {

public static boolean isSstDataInKeyGroupRange(
RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) {
final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];

Expand All @@ -195,34 +180,10 @@ public static void compactSstFilesToExpectedRange(
CompositeKeySerializationUtils.serializeKeyGroup(
dbExpectedKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);

Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();
KeyRange dbKeyRange = getDBKeyRange(db);

boolean clipMinRangeRequired =
comparator.compare(dbKeyRange.minKey, beginKeyGroupBytes) < 0;
boolean clipMaxRangeRequired = comparator.compare(dbKeyRange.maxKey, endKeyGroupBytes) >= 0;

if (clipMinRangeRequired || clipMaxRangeRequired) {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
db.clipColumnFamily(columnFamilyHandle, beginKeyGroupBytes, endKeyGroupBytes);
}

if (clipMinRangeRequired) {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
db.compactRange(columnFamilyHandle, new byte[] {}, beginKeyGroupBytes);
}
}

if (clipMaxRangeRequired) {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
db.compactRange(
columnFamilyHandle,
endKeyGroupBytes,
// This key is larger than the current limit for key groups
new byte[] {(byte) 0xFF, (byte) 0xFF});
}
}
}
Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();
return comparator.compare(dbKeyRange.minKey, beginKeyGroupBytes) >= 0
&& comparator.compare(dbKeyRange.maxKey, endKeyGroupBytes) < 0;
}

private static KeyRange getDBKeyRange(RocksDB db) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ protected RocksDBHandle(
this.writeBufferManagerCapacity = writeBufferManagerCapacity;
}

boolean isOpen() {
return db != null;
}

void openDB() throws IOException {
loadDb();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,27 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
.forEach(localKeyedStateHandles::add);

// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

try {
// Process all state downloads
transferRemoteStateToLocalDirectory(allDownloadSpecs);

if (localKeyedStateHandles.size() > 1 && useIngestDbRestoreMode) {
// Optimized path for merging multiple handles with Ingest/Clip
rescaleClipIngestDB(localKeyedStateHandles);
rescaleClipIngestDB(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
} else {
// Optimized path for single handle and legacy path for merging multiple handles.
rescaleCopyFromTemporaryInstance(localKeyedStateHandles);
rescaleCopyFromTemporaryInstance(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
}

} finally {
Expand All @@ -338,16 +349,10 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
}

private void rescaleCopyFromTemporaryInstance(
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception {

// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
byte[] startKeyGroupPrefixBytes,
byte[] stopKeyGroupPrefixBytes)
throws Exception {

// Choose the best state handle for the initial DB
final IncrementalLocalKeyedStateHandle selectedInitialHandle =
Expand All @@ -364,59 +369,68 @@ private void rescaleCopyFromTemporaryInstance(

for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
logger.info("Starting to restore from state handle: {} with rescaling.", stateHandle);

try (RestoredDBInstance tmpRestoreDBInfo =
restoreTempDBInstanceFromLocalState(stateHandle);
RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(
this.rocksHandle.getDb(), writeBatchSize)) {
copyTempDbIntoBaseDb(
tmpRestoreDBInfo,
writeBatchWrapper,
startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
}
logger.info("Finished restoring from state handle: {} with rescaling.", stateHandle);
}
}

List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles =
tmpRestoreDBInfo.columnFamilyHandles;

// iterating only the requested descriptors automatically skips the default
// column
// family handle
for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx);

ColumnFamilyHandle targetColumnFamilyHandle =
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
null,
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
.columnFamilyHandle;

try (RocksIteratorWrapper iterator =
RocksDBOperationUtils.getRocksIterator(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandle,
tmpRestoreDBInfo.readOptions)) {

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(
targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the
// sorted
// order,
// we can just break here.
break;
}

iterator.next();
}
} // releases native iterator resources
private void copyTempDbIntoBaseDb(
RestoredDBInstance tmpRestoreDBInfo,
RocksDBWriteBatchWrapper writeBatchWrapper,
byte[] startKeyGroupPrefixBytes,
byte[] stopKeyGroupPrefixBytes)
throws Exception {

List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

// iterating only the requested descriptors automatically skips the default
// column
// family handle
for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx);

ColumnFamilyHandle targetColumnFamilyHandle =
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
.columnFamilyHandle;

try (RocksIteratorWrapper iterator =
RocksDBOperationUtils.getRocksIterator(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandle,
tmpRestoreDBInfo.readOptions)) {

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(
targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the
// sorted
// order,
// we can just break here.
break;
}

iterator.next();
}
logger.info(
"Finished restoring from state handle: {} with rescaling.", stateHandle);
}
} // releases native iterator resources
}
}

Expand All @@ -426,7 +440,10 @@ private void rescaleCopyFromTemporaryInstance(
* are copied into the real restore instance and then the temporary instance is discarded.
*/
private void rescaleClipIngestDB(
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception {
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
byte[] startKeyGroupPrefixBytes,
byte[] stopKeyGroupPrefixBytes)
throws Exception {

final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath();
final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs");
Expand All @@ -435,6 +452,8 @@ private void rescaleClipIngestDB(
final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
exportedColumnFamilyMetaData = new HashMap<>();

rocksHandle.openDB();

try {
for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
logger.info(
Expand All @@ -447,39 +466,47 @@ private void rescaleClipIngestDB(
List<ColumnFamilyHandle> tmpColumnFamilyHandles =
tmpRestoreDBInfo.columnFamilyHandles;

// Use Range delete to clip the temp db to the target range of the backend
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
keyGroupRange,
stateHandle.getKeyGroupRange(),
keyGroupPrefixBytes);

// Clip all tmp db to the range specified in the corresponding state handle
RocksDBIncrementalCheckpointUtils.compactSstFilesToExpectedRange(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
keyGroupPrefixBytes,
stateHandle.getKeyGroupRange());

// Export all the Column Families and store the result in
// exportedColumnFamilyMetaData
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
tmpRestoreDBInfo.stateMetaInfoSnapshots,
exportCfBasePath,
exportedColumnFamilyMetaData);
stateHandle.getKeyGroupRange())) {

// Use Range delete to clip the temp db to the target range of the backend
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
keyGroupRange,
stateHandle.getKeyGroupRange(),
keyGroupPrefixBytes);

// Export all the Column Families and store the result in
// exportedColumnFamilyMetaData
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
tmpRestoreDBInfo.stateMetaInfoSnapshots,
exportCfBasePath,
exportedColumnFamilyMetaData);
} else {
// Must copy
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(
this.rocksHandle.getDb(), writeBatchSize)) {
copyTempDbIntoBaseDb(
tmpRestoreDBInfo,
writeBatchWrapper,
startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
}
}
}
logger.info(
"Finished exporting column family from state handle: {} for rescaling.",
stateHandle);
}

// Open the target RocksDB and import the exported column families
this.rocksHandle.openDB();
exportedColumnFamilyMetaData.forEach(
this.rocksHandle::registerStateColumnFamilyHandleWithImport);
rocksHandle::registerStateColumnFamilyHandleWithImport);
logger.info(
"Finished importing exported column families into target DB for rescaling.");
} finally {
Expand Down

0 comments on commit 720e6a8

Please sign in to comment.