Skip to content

Commit

Permalink
Merge pull request #2 from mayuehappy/check_before_ingest
Browse files Browse the repository at this point in the history
check db key overlap before ingest and only ingest with NO_KEY_OVERLA…
  • Loading branch information
StefanRRichter authored Jan 31, 2024
2 parents 36be388 + 3818b94 commit 620593b
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
Expand All @@ -28,6 +30,7 @@
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -190,6 +193,29 @@ public static void clipColumnFamilies(
}
}

public static Tuple2<byte[], byte[]> getDBKeyRange(RocksDB db) {
byte[] smallestKey = null;
byte[] largestKey = null;
BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
List<LiveFileMetaData> liveFilesMetaData = db.getLiveFilesMetaData();
for (LiveFileMetaData fileMetaData : liveFilesMetaData) {
byte[] sstSmallestKey = fileMetaData.smallestKey();
byte[] sstLargestKey = fileMetaData.largestKey();

if (smallestKey == null
|| (fileMetaData.smallestKey() != null
&& comparator.compare(sstSmallestKey, smallestKey) < 0)) {
smallestKey = sstSmallestKey;
}
if (largestKey == null
|| (sstLargestKey != null
&& comparator.compare(sstLargestKey, largestKey) > 0)) {
largestKey = sstLargestKey;
}
}
return Tuple2.of(smallestKey, largestKey);
}

public static void exportColumnFamilies(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
Expand Down Expand Up @@ -82,6 +84,7 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;

Expand Down Expand Up @@ -330,16 +333,45 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

if (localKeyedStateHandles.size() > 1 && useIngestDbRestoreMode) {
// Optimized path for merging multiple handles with Ingest/Clip
rescaleClipIngestDB(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
} else {
// Optimized path for single handle and legacy path for merging multiple handles.
rescaleCopyFromTemporaryInstance(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
}
// Open all tmp DB instance.
// First, determine if IngestDB can be used.
// If not, fallback to the old recovery method
HashMap<IncrementalLocalKeyedStateHandle, RestoredDBInstance>
stateHandleAndRestoreDBInstance = new HashMap<>();
try {
for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
logger.info(
"Starting to restore from state handle: {} with rescaling.",
stateHandle);
RestoredDBInstance tmpRestoreDBInfo =
restoreTempDBInstanceFromLocalState(stateHandle);
stateHandleAndRestoreDBInstance.put(stateHandle, tmpRestoreDBInfo);
}

boolean isRestoreDBsKeyOverlap =
checkRestoreDBsKeyOverlap(stateHandleAndRestoreDBInstance.values());
if (localKeyedStateHandles.size() > 1
&& useIngestDbRestoreMode
&& !isRestoreDBsKeyOverlap) {
// Optimized path for merging multiple handles with Ingest/Clip
System.out.println("rescaleClipIngestDB");
rescaleClipIngestDB(
stateHandleAndRestoreDBInstance,
startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
} else {
// Optimized path for single handle and legacy path for merging multiple
// handles.
System.out.println("rescaleCopyFromTemporaryInstance");
rescaleCopyFromTemporaryInstance(
stateHandleAndRestoreDBInstance,
startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
}
} finally {
// Close native RocksDB objects
stateHandleAndRestoreDBInstance.values().forEach(IOUtils::closeAllQuietly);
}
} finally {
// Cleanup all download directories
allDownloadSpecs.stream()
Expand All @@ -349,33 +381,32 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
}

private void rescaleCopyFromTemporaryInstance(
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
HashMap<IncrementalLocalKeyedStateHandle, RestoredDBInstance> localKeyedStateHandles,
byte[] startKeyGroupPrefixBytes,
byte[] stopKeyGroupPrefixBytes)
throws Exception {

// Choose the best state handle for the initial DB
final IncrementalLocalKeyedStateHandle selectedInitialHandle =
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
localKeyedStateHandles, keyGroupRange, overlapFractionThreshold);
localKeyedStateHandles.keySet(), keyGroupRange, overlapFractionThreshold);

Preconditions.checkNotNull(selectedInitialHandle);

// Remove the selected handle from the list so that we don't restore it twice.
localKeyedStateHandles.remove(selectedInitialHandle);

// Init the base DB instance with the initial state
initBaseDBForRescaling(selectedInitialHandle);

for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles.keySet()) {
logger.info("Starting to restore from state handle: {} with rescaling.", stateHandle);

try (RestoredDBInstance tmpRestoreDBInfo =
restoreTempDBInstanceFromLocalState(stateHandle);
RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(
this.rocksHandle.getDb(), writeBatchSize)) {
// skip the selected handle from the list so that we don't restore it twice.
if (stateHandle.equals(selectedInitialHandle)) {
continue;
}

try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
RestoredDBInstance tmpRestoreDBInfo = localKeyedStateHandles.get(stateHandle);
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles =
Expand Down Expand Up @@ -431,7 +462,7 @@ private void rescaleCopyFromTemporaryInstance(
* are copied into the real restore instance and then the temporary instance is discarded.
*/
private void rescaleClipIngestDB(
Collection<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
HashMap<IncrementalLocalKeyedStateHandle, RestoredDBInstance> stateHandleAndDBInstances,
byte[] startKeyGroupPrefixBytes,
byte[] stopKeyGroupPrefixBytes)
throws Exception {
Expand All @@ -444,33 +475,40 @@ private void rescaleClipIngestDB(
exportedColumnFamilyMetaData = new HashMap<>();

try {
for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
for (IncrementalLocalKeyedStateHandle stateHandle :
stateHandleAndDBInstances.keySet()) {
logger.info(
"Starting to restore from state handle: {} with rescaling using Clip/Ingest DB.",
stateHandle);

try (RestoredDBInstance tmpRestoreDBInfo =
restoreTempDBInstanceFromLocalState(stateHandle)) {
RestoredDBInstance tmpRestoreDBInfo = stateHandleAndDBInstances.get(stateHandle);
List<ColumnFamilyHandle> tmpColumnFamilyHandles =
tmpRestoreDBInfo.columnFamilyHandles;

List<ColumnFamilyHandle> tmpColumnFamilyHandles =
tmpRestoreDBInfo.columnFamilyHandles;
// Only need deleteRange to Clip DB
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
keyGroupRange,
stateHandle.getKeyGroupRange(),
keyGroupPrefixBytes);

// Clip all tmp db to Range [startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes)
// RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
// tmpRestoreDBInfo.db,
// tmpColumnFamilyHandles,
// startKeyGroupPrefixBytes,
// stopKeyGroupPrefixBytes);

// Export all the Column Families and store the result in
// exportedColumnFamilyMetaData
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
tmpRestoreDBInfo.stateMetaInfoSnapshots,
exportCfBasePath,
exportedColumnFamilyMetaData);

// Clip all tmp db to Range [startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes)
RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);

// Export all the Column Families and store the result in
// exportedColumnFamilyMetaData
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
tmpRestoreDBInfo.db,
tmpColumnFamilyHandles,
tmpRestoreDBInfo.stateMetaInfoSnapshots,
exportCfBasePath,
exportedColumnFamilyMetaData);
}
logger.info(
"Finished exporting column family from state handle: {} for rescaling.",
stateHandle);
Expand Down Expand Up @@ -511,6 +549,38 @@ private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle stateHandle
}
}

private boolean checkRestoreDBsKeyOverlap(Collection<RestoredDBInstance> dbInstances) {
List<Tuple2<byte[], byte[]>> dbKeyRanges =
dbInstances.stream()
.map(dbInstance -> dbInstance.db)
.map(RocksDBIncrementalCheckpointUtils::getDBKeyRange)
.filter(keyRange -> keyRange.f0 != null)
.collect(Collectors.toList());
BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
Collections.sort(
dbKeyRanges,
(t1, t2) -> {
return comparator.compare(t1.f0, t2.f0);
});
for (int i = 0; i < dbKeyRanges.size() - 1; i++) {
Tuple2<byte[], byte[]> curKeyRange = dbKeyRanges.get(i);
byte[] curLargestKey = curKeyRange.f1;
Tuple2<byte[], byte[]> nextKeyRange = dbKeyRanges.get(i + 1);
byte[] nextSmallestKey = nextKeyRange.f0;
if (comparator.compare(curLargestKey, nextSmallestKey) > 0) {
return true;
}
}
return false;
}

public static void printByteArrayAsHex(byte[] byteArray) {
for (byte b : byteArray) {
String hex = String.format("%02X", b);
System.out.print(hex + " ");
}
}

/** Entity to hold the temporary RocksDB instance created for restore. */
private static class RestoredDBInstance implements AutoCloseable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,6 +73,16 @@ public void testScaleIn_8_2() throws Exception {
testRescale(8, 2, 100_000_000, 10);
}

@Test
public void testScaleIn_2_3() throws Exception {
testRescale(2, 3, 100_000_000, 10);
}

@Test
public void testScaleIn_3_2() throws Exception {
testRescale(3, 2, 100_000_000, 10);
}

public void testRescale(
int startParallelism, int restoreParallelism, int numKeys, int updateDistance)
throws Exception {
Expand Down Expand Up @@ -139,10 +150,12 @@ public void testRescale(
getSharedInstance(),
fromLocalFile(
TempDirUtils.newFolder(
tempFolder, "checkpointsDir_" + i)),
tempFolder,
"checkpointsDir_" + UUID.randomUUID() + i)),
fromLocalFile(
TempDirUtils.newFolder(
tempFolder, "sharedStateDir_" + i)),
tempFolder,
"sharedStateDir_" + UUID.randomUUID() + i)),
1,
4096);

Expand Down

0 comments on commit 620593b

Please sign in to comment.