Skip to content

Commit

Permalink
Close and reopen temporary DBs
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Feb 12, 2024
1 parent beee81f commit 8c90c30
Show file tree
Hide file tree
Showing 13 changed files with 817 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import java.util.function.Supplier;

import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
Expand Down Expand Up @@ -109,8 +109,6 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke

private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;

private static final int UNDEFINED_INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE = -1;

// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration
Expand Down Expand Up @@ -174,14 +172,18 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
*/
private final double overlapFractionThreshold;

private final int incrementalRestoreInstanceBufferSize;

/**
* Whether we use the optimized Ingest/Clip DB method for rescaling RocksDB incremental
* checkpoints.
*/
private final TernaryBoolean useIngestDbRestoreMode;

/**
* Whether we trigger an async compaction after restores for which we detect state in the
* database (including tombstones) that exceed the proclaimed key-groups range of the backend.
*/
private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale;

/** Factory for Write Buffer Manager and Block Cache. */
private RocksDBMemoryFactory rocksDBMemoryFactory;
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -212,11 +214,10 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing
this.memoryConfiguration = new RocksDBMemoryConfiguration();
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
this.incrementalRestoreInstanceBufferSize =
UNDEFINED_INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE;
this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT;
this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED;
}

/**
Expand Down Expand Up @@ -311,11 +312,11 @@ private EmbeddedRocksDBStateBackend(
overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1,
"Overlap fraction threshold of restoring should be between 0 and 1");

incrementalRestoreInstanceBufferSize =
original.incrementalRestoreInstanceBufferSize
== UNDEFINED_INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE
? config.get(INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE)
: original.incrementalRestoreInstanceBufferSize;
incrementalRestoreAsyncCompactAfterRescale =
original.incrementalRestoreAsyncCompactAfterRescale == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(
config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE))
: original.incrementalRestoreAsyncCompactAfterRescale;

useIngestDbRestoreMode =
original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED
Expand Down Expand Up @@ -493,8 +494,8 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
.setWriteBatchSize(getWriteBatchSize())
.setOverlapFractionThreshold(getOverlapFractionThreshold())
.setIncrementalRestoreInstanceBufferSize(
getIncrementalRestoreInstanceBufferSize())
.setIncrementalRestoreAsyncCompactAfterRescale(
getIncrementalRestoreAsyncCompactAfterRescale())
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode());
return builder.build();
}
Expand Down Expand Up @@ -834,11 +835,9 @@ public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory) {
: overlapFractionThreshold;
}

int getIncrementalRestoreInstanceBufferSize() {
return incrementalRestoreInstanceBufferSize
== UNDEFINED_INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE
? INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE.defaultValue()
: incrementalRestoreInstanceBufferSize;
boolean getIncrementalRestoreAsyncCompactAfterRescale() {
return incrementalRestoreAsyncCompactAfterRescale.getOrDefault(
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue());
}

boolean getUseIngestDbRestoreMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,20 @@ public class RocksDBConfigurableOptions implements Serializable {
+ "has a chance to be an initial handle. "
+ "The default value is 0.0, there is always a handle will be selected for initialization. ");

public static final ConfigOption<Integer> INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE =
key("state.backend.rocksdb.incremental-restore-instance-buffer-size")
.intType()
.defaultValue(4)
.withDescription(
"The maximum number of open temporary RocksDB instances per backend in the lookahead buffer"
+ " during incremental restore with USE_INGEST_DB_RESTORE_MODE = true. Larger buffers"
+ " allow to potentially detect more state handles that can be imported directly"
+ " before we need to fall back to copy.");

public static final ConfigOption<Boolean> USE_INGEST_DB_RESTORE_MODE =
key("state.backend.rocksdb.use-ingest-db-restore-mode")
.booleanType()
.defaultValue(false)
.defaultValue(Boolean.FALSE)
.withDescription(
"A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys"
+ " in the SST files does not exceed the declared key-group range. ");
+ " in the SST files does not exceed the declared key-group range.");

public static final ConfigOption<Boolean> INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE =
key("state.backend.rocksdb.incremental-restore-async-compact-after-rescale")
.booleanType()
.defaultValue(Boolean.FALSE)
.withDescription(
"If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
Expand All @@ -309,7 +306,8 @@ public class RocksDBConfigurableOptions implements Serializable {
BLOOM_FILTER_BITS_PER_KEY,
BLOOM_FILTER_BLOCK_BASED_MODE,
RESTORE_OVERLAP_FRACTION_THRESHOLD,
INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE
USE_INGEST_DB_RESTORE_MODE,
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE
};

private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
Expand All @@ -318,8 +316,7 @@ public class RocksDBConfigurableOptions implements Serializable {
MAX_BACKGROUND_THREADS,
LOG_FILE_NUM,
MAX_WRITE_BUFFER_NUMBER,
MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
INCREMENTAL_RESTORE_INSTANCE_BUFFER_SIZE));
MIN_WRITE_BUFFER_NUMBER_TO_MERGE));

private static final Set<ConfigOption<?>> SIZE_CONFIG_SET =
new HashSet<>(
Expand Down
Loading

0 comments on commit 8c90c30

Please sign in to comment.