Skip to content

Commit

Permalink
[FLINK-31238] Roman's comments and more improvements to the original PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Jan 10, 2024
1 parent 420c5c1 commit e3d0938
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke

private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;

private static final boolean UNDEFINED_USE_INGEST_DB_RESTORE_MODE = false;

// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration
Expand Down Expand Up @@ -180,9 +178,13 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
* The threshold of the overlap fraction between the handle's key-group range and target
* key-group range.
*/
private double overlapFractionThreshold;
private final double overlapFractionThreshold;

private boolean useIngestDbRestoreMode;
/**
* Whether we use the optimized Ingest/Clip DB method for rescaling RocksDB incremental
* checkpoints.
*/
private final TernaryBoolean useIngestDbRestoreMode;

/** Factory for Write Buffer Manager and Block Cache. */
private RocksDBMemoryFactory rocksDBMemoryFactory;
Expand Down Expand Up @@ -216,7 +218,7 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing
this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT;
this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
this.useIngestDbRestoreMode = UNDEFINED_USE_INGEST_DB_RESTORE_MODE;
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
}

/**
Expand Down Expand Up @@ -312,9 +314,9 @@ private EmbeddedRocksDBStateBackend(
"Overlap fraction threshold of restoring should be between 0 and 1");

useIngestDbRestoreMode =
original.useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE
? config.get(USE_INGEST_DB_RESTORE_MODE)
: original.useIngestDbRestoreMode;
original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
: TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());

this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
}
Expand Down Expand Up @@ -866,9 +868,7 @@ public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory) {
}

boolean getUseIngestDbRestoreMode() {
return useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE
? USE_INGEST_DB_RESTORE_MODE.defaultValue()
: useIngestDbRestoreMode;
return useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -119,14 +120,17 @@ public static void registerKvStateInformation(
*
* <p>Creates the column family for the state. Sets TTL compaction filter if {@code
* ttlCompactFiltersManager} is not {@code null}.
*
* @param importFilesMetaData if not empty, we import the files specified in the metadata to the
* column family.
*/
public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity,
@Nullable List<ExportImportFilesMetaData> cfMetaDataList) {
List<ExportImportFilesMetaData> importFilesMetaData) {

ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(
Expand All @@ -135,10 +139,15 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
ttlCompactFiltersManager,
writeBufferManagerCapacity);

ColumnFamilyHandle columnFamilyHandle =
cfMetaDataList == null || cfMetaDataList.isEmpty()
? createColumnFamily(columnFamilyDescriptor, db)
: createColumnFamilyWithImport(columnFamilyDescriptor, db, cfMetaDataList);
final ColumnFamilyHandle columnFamilyHandle;
try {
columnFamilyHandle =
createColumnFamily(columnFamilyDescriptor, db, importFilesMetaData);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
}

return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
}

Expand All @@ -154,7 +163,7 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
columnFamilyOptionsFactory,
ttlCompactFiltersManager,
writeBufferManagerCapacity,
null);
Collections.emptyList());
}

/**
Expand All @@ -168,15 +177,17 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity) {

byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(
!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

ColumnFamilyOptions options =
createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());

if (ttlCompactFiltersManager != null) {
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
}
byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(
!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

if (writeBufferManagerCapacity != null) {
// It'd be great to perform the check earlier, e.g. when creating write buffer manager.
Expand All @@ -203,8 +214,7 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
* @return true if sanity check passes, false otherwise
*/
static boolean sanityCheckArenaBlockSize(
long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity)
throws IllegalStateException {
long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity) {

long defaultArenaBlockSize =
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize);
Expand Down Expand Up @@ -243,25 +253,16 @@ public static ColumnFamilyOptions createColumnFamilyOptions(
}

private static ColumnFamilyHandle createColumnFamily(
ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
try {
return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
IOUtils.closeQuietly(columnDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
}
}

private static ColumnFamilyHandle createColumnFamilyWithImport(
ColumnFamilyDescriptor columnDescriptor,
RocksDB db,
List<ExportImportFilesMetaData> metaDataList) {
try {
List<ExportImportFilesMetaData> importFilesMetaData)
throws RocksDBException {

if (importFilesMetaData.isEmpty()) {
return db.createColumnFamily(columnDescriptor);
} else {
return db.createColumnFamilyWithImport(
columnDescriptor, new ImportColumnFamilyOptions(), metaDataList);
} catch (RocksDBException e) {
IOUtils.closeQuietly(columnDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
columnDescriptor, new ImportColumnFamilyOptions(), importFilesMetaData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,23 @@ RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
return registeredStateMetaInfoEntry;
}

RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport(
void registerStateColumnFamilyHandleWithImport(
RegisteredStateMetaInfoBase stateMetaInfo,
List<ExportImportFilesMetaData> cfMetaDataList) {

RocksDbKvStateInfo registeredStateMetaInfoEntry =
kvStateInformation.get(stateMetaInfo.getName());

Preconditions.checkState(registeredStateMetaInfoEntry == null);
Preconditions.checkState(!kvStateInformation.containsKey(stateMetaInfo.getName()));

registeredStateMetaInfoEntry =
RocksDBOperationUtils.registerKvStateInformation(
kvStateInformation,
nativeMetricMonitor,
stateMetaInfo.getName(),
RocksDBOperationUtils.createStateInfo(
stateMetaInfo,
db,
columnFamilyOptionsFactory,
ttlCompactFiltersManager,
writeBufferManagerCapacity,
cfMetaDataList);

RocksDBOperationUtils.registerKvStateInformation(
kvStateInformation,
nativeMetricMonitor,
stateMetaInfo.getName(),
registeredStateMetaInfoEntry);

return registeredStateMetaInfoEntry;
cfMetaDataList));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.TriConsumerWithException;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -109,9 +108,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper

private boolean isKeySerializerCompatibilityChecked;

private final TriConsumerWithException<
Collection<IncrementalLocalKeyedStateHandle>, byte[], byte[], Exception>
rescalingRestoreFromLocalStateOperation;
private final boolean useIngestDbRestoreMode;

public RocksDBIncrementalRestoreOperation(
String operatorIdentifier,
Expand Down Expand Up @@ -158,10 +155,7 @@ public RocksDBIncrementalRestoreOperation(
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.keySerializerProvider = keySerializerProvider;
this.userCodeClassLoader = userCodeClassLoader;
this.rescalingRestoreFromLocalStateOperation =
useIngestDbRestoreMode
? this::rescaleClipIngestDB
: this::rescaleCopyFromTemporaryInstance;
this.useIngestDbRestoreMode = useIngestDbRestoreMode;
}

/** Root method that branches for different implementations of {@link KeyedStateHandle}. */
Expand Down Expand Up @@ -331,8 +325,16 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

rescalingRestoreFromLocalStateOperation.accept(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
if (useIngestDbRestoreMode) {
// Optimized path with Ingest/Clip
rescaleClipIngestDB(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
} else {
// Legacy path
rescaleCopyFromTemporaryInstance(
localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
}

} finally {
// Cleanup all download directories
allDownloadSpecs.stream()
Expand Down Expand Up @@ -587,6 +589,7 @@ private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
RegisteredStateMetaInfoBase metaInfoBase =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);

ColumnFamilyDescriptor columnFamilyDescriptor =
RocksDBOperationUtils.createColumnFamilyDescriptor(
metaInfoBase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -116,7 +117,14 @@ public static List<Object[]> modes() {
{
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new
JobManagerCheckpointStorage::new,
false
},
{
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new,
true
},
{
false,
Expand All @@ -126,7 +134,8 @@ public static List<Object[]> modes() {
TempDirUtils.newFolder(tempFolder).toURI().toString();
return new FileSystemCheckpointStorage(
new Path(checkpointPath), 0, -1);
}
},
false
}
});
}
Expand All @@ -137,6 +146,9 @@ public static List<Object[]> modes() {
@Parameter(value = 1)
public SupplierWithException<CheckpointStorage, IOException> storageSupplier;

@Parameter(value = 2)
public boolean useIngestDB;

// Store it because we need it for the cleanup test.
private String dbPath;
private RocksDB db = null;
Expand Down Expand Up @@ -168,6 +180,7 @@ protected ConfigurableStateBackend getStateBackend() throws IOException {
EmbeddedRocksDBStateBackend backend =
new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
Configuration configuration = new Configuration();
configuration.setBoolean(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
configuration.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
Expand Down Expand Up @@ -234,7 +247,8 @@ public void setupRocksKeyedStateBackend() throws Exception {
spy(db),
defaultCFHandle,
optionsContainer.getColumnOptions())
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing);
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
.setUseIngestDbRestoreMode(useIngestDB);

if (enableIncrementalCheckpointing) {
rocksDBStateUploader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -109,23 +110,29 @@ public class AutoRescalingITCase extends TestLogger {
private static final int slotsPerTaskManager = 2;
private static final int totalSlots = numTaskManagers * slotsPerTaskManager;

@Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}")
@Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}, useIngestDB = {2}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{"rocksdb", 0}, {"rocksdb", 2}, {"filesystem", 0}, {"filesystem", 2}
{"rocksdb", 0, false},
{"rocksdb", 2, true},
{"filesystem", 0, false},
{"filesystem", 2, false}
});
}

public AutoRescalingITCase(String backend, int buffersPerChannel) {
public AutoRescalingITCase(String backend, int buffersPerChannel, boolean useIngestDB) {
this.backend = backend;
this.buffersPerChannel = buffersPerChannel;
this.useIngestDB = useIngestDB;
}

private final String backend;

private final int buffersPerChannel;

private final boolean useIngestDB;

private String currentBackend = null;

enum OperatorCheckpointMethod {
Expand Down Expand Up @@ -154,6 +161,7 @@ public void setup() throws Exception {
final File savepointDir = temporaryFolder.newFolder();

config.setString(StateBackendOptions.STATE_BACKEND, currentBackend);
config.setBoolean(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDB);
config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
config.setString(
Expand Down

0 comments on commit e3d0938

Please sign in to comment.