diff --git a/docs/content.zh/docs/ops/traces.md b/docs/content.zh/docs/ops/traces.md index d5025b44d02bf..8157d1ecddd2a 100644 --- a/docs/content.zh/docs/ops/traces.md +++ b/docs/content.zh/docs/ops/traces.md @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio - org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker + org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker Checkpoint startTs Timestamp when the checkpoint has started. @@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio What was the state of this checkpoint: FAILED or COMPLETED. - JobInitialization + JobInitialization startTs Timestamp when the job initialization has started. @@ -157,7 +157,11 @@ Flink reports a single span trace for the whole checkpoint and job initializatio (Max/Sum)DownloadStateDurationMs

(optional - currently only supported by RocksDB Incremental) - The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS. + The aggregated (max and sum) duration across all subtasks of downloading state files from the DFS. + + + (Max/Sum)RestoreStateDurationMs

(optional - currently only supported by RocksDB Incremental) + The aggregated (max and sum) duration across all subtasks of restoring the state backend from fully localized state, i.e. after all remote state was downloaded. (Max/Sum)RestoredStateSizeBytes.[location] @@ -167,6 +171,10 @@ Flink reports a single span trace for the whole checkpoint and job initializatio REMOTE, UNKNOWN. + + (Max/Sum)RestoreAsyncCompactionDurationMs

(optional - currently only supported by RocksDB Incremental) + The aggregated (max and sum) duration across all subtasks for async compaction after incremental restore. + diff --git a/docs/content/docs/ops/traces.md b/docs/content/docs/ops/traces.md index d5025b44d02bf..8157d1ecddd2a 100644 --- a/docs/content/docs/ops/traces.md +++ b/docs/content/docs/ops/traces.md @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio - org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker + org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker Checkpoint startTs Timestamp when the checkpoint has started. @@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio What was the state of this checkpoint: FAILED or COMPLETED. - JobInitialization + JobInitialization startTs Timestamp when the job initialization has started. @@ -157,7 +157,11 @@ Flink reports a single span trace for the whole checkpoint and job initializatio (Max/Sum)DownloadStateDurationMs

(optional - currently only supported by RocksDB Incremental) - The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS. + The aggregated (max and sum) duration across all subtasks of downloading state files from the DFS. + + + (Max/Sum)RestoreStateDurationMs

(optional - currently only supported by RocksDB Incremental) + The aggregated (max and sum) duration across all subtasks of restoring the state backend from fully localized state, i.e. after all remote state was downloaded. (Max/Sum)RestoredStateSizeBytes.[location] @@ -167,6 +171,10 @@ Flink reports a single span trace for the whole checkpoint and job initializatio REMOTE, UNKNOWN. + + (Max/Sum)RestoreAsyncCompactionDurationMs

(optional - currently only supported by RocksDB Incremental) + The aggregated (max and sum) duration across all subtasks for async compaction after incremental restore. + diff --git a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html index 8810b9086d1fe..4ec59a27110e8 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html @@ -74,6 +74,12 @@ Integer The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'. + +
state.backend.rocksdb.incremental-restore-async-compact-after-rescale
+ false + Boolean + If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend. +
state.backend.rocksdb.log.dir
(none) @@ -116,6 +122,12 @@ Boolean If true, every newly created SST file will contain a Bloom filter. It is disabled by default. + +
state.backend.rocksdb.use-ingest-db-restore-mode
+ false + Boolean + A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys in the SST files does not exceed the declared key-group range. +
state.backend.rocksdb.write-batch-size
2 mb diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java index bf1714cb5b8ba..38adccc2cd943 100644 --- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java @@ -56,10 +56,21 @@ private CollectionUtil() { throw new AssertionError(); } + /** Returns true if the given collection is null or empty. */ public static boolean isNullOrEmpty(Collection collection) { return collection == null || collection.isEmpty(); } + /** Returns true if the given collection is empty or contains only null elements. */ + public static boolean isEmptyOrAllElementsNull(Collection collection) { + for (Object o : collection) { + if (o != null) { + return false; + } + } + return true; + } + public static boolean isNullOrEmpty(Map map) { return map == null || map.isEmpty(); } @@ -214,4 +225,35 @@ static int computeRequiredCapacity(int expectedSize, float loadFactor) { ? (int) Math.ceil(expectedSize / loadFactor) : Integer.MAX_VALUE; } + + /** + * Casts the given collection to a subtype. This is an unchecked cast that can lead to runtime + * exceptions. + * + * @param collection the collection to cast. + * @return the collection unchecked-cast to a subtype. + * @param the subtype to cast to. + */ + public static Collection subTypeCast(Collection collection) { + @SuppressWarnings("unchecked") + Collection result = (Collection) collection; + return result; + } + + /** + * Casts the given collection to a subtype. This is a checked cast. + * + * @param collection the collection to cast. + * @param subTypeClass the class of the subtype to cast to. + * @return the collection checked and cast to a subtype. + * @param the subtype to cast to. + */ + public static Collection checkedSubTypeCast( + Collection collection, Class subTypeClass) { + for (Object o : collection) { + // probe each object, will throw ClassCastException on mismatch. + subTypeClass.cast(o); + } + return subTypeCast(collection); + } } diff --git a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java index fb2abfec020b5..50493316e5158 100644 --- a/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java @@ -22,12 +22,16 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import static org.apache.flink.util.CollectionUtil.HASH_MAP_DEFAULT_LOAD_FACTOR; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** Tests for java collection utilities. */ @ExtendWith(TestLoggerExtension.class) @@ -107,4 +111,47 @@ public void testComputeCapacity() { } catch (IllegalArgumentException expected) { } } + + @Test + public void testIsEmptyOrAllElementsNull() { + Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Collections.emptyList())); + Assertions.assertTrue( + CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList(null))); + Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, null))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList("test"))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test"))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList("test", null))); + Assertions.assertFalse( + CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test", null))); + } + + @Test + public void testCheckedSubTypeCast() { + List list = new ArrayList<>(); + B b = new B(); + C c = new C(); + list.add(b); + list.add(c); + list.add(null); + Collection castSuccess = CollectionUtil.checkedSubTypeCast(list, B.class); + Iterator iterator = castSuccess.iterator(); + Assertions.assertEquals(b, iterator.next()); + Assertions.assertEquals(c, iterator.next()); + Assertions.assertNull(iterator.next()); + Assertions.assertFalse(iterator.hasNext()); + try { + Collection castFail = CollectionUtil.checkedSubTypeCast(list, C.class); + fail("Expected ClassCastException"); + } catch (ClassCastException expected) { + } + } + + static class A {} + + static class B extends A {} + + static class C extends B {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 4aa375020c95b..392ad8471173d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -69,7 +69,10 @@ private MetricNames() {} public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs"; public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs"; public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs"; + public static final String RESTORE_STATE_DURATION = "RestoreStateDurationMs"; public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes"; + public static final String RESTORE_ASYNC_COMPACTION_DURATION = + "RestoreAsyncCompactionDurationMs"; public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java index 250d77c9f6d52..f677acedfd7e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java @@ -129,6 +129,8 @@ public String toString() { + delegate + ", offsets=" + offsets + + ", size=" + + size + '}'; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java index 00da919b3811a..c8fe9ef465280 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java @@ -34,7 +34,7 @@ /** An abstract base implementation of the {@link StateBackendBuilder} interface. */ public abstract class AbstractKeyedStateBackendBuilder - implements StateBackendBuilder { + implements StateBackendBuilder, BackendBuildingException> { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final TaskKvStateRegistry kvStateRegistry; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java index 4e9ff68218419..f9d5d24636df0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java @@ -109,6 +109,12 @@ public int hashCode() { @Override public String toString() { - return "DirectoryStateHandle{" + "directory=" + directoryString + '}'; + return "DirectoryStateHandle{" + + "directory='" + + directoryString + + '\'' + + ", directorySize=" + + directorySize + + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index f162efa936bac..08e0fba18bd4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -100,5 +100,16 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(handle, localPath); } + + @Override + public String toString() { + return "HandleAndLocalPath{" + + "handle=" + + handle + + ", localPath='" + + localPath + + '\'' + + '}'; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java index 8b4be574d28bb..49f943a7b15bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java @@ -135,6 +135,10 @@ public String toString() { + '}'; } + public String prettyPrintInterval() { + return "[" + startKeyGroup + ", " + endKeyGroup + "]"; + } + @Override public Iterator iterator() { return new KeyGroupIterator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index d036955d0c876..1cec488d3891e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -53,6 +53,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; @@ -217,13 +218,19 @@ protected CheckpointableKeyedStateBackend createKeyedBackend( keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), Collections.emptyList(), - new CloseableRegistry())); + new CloseableRegistry(), + 1.0d)); return backend; } + protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() { + return (name, value) -> {}; + } + protected CheckpointableKeyedStateBackend restoreKeyedBackend( TypeSerializer keySerializer, KeyedStateHandle state) throws Exception { return restoreKeyedBackend(keySerializer, state, env); @@ -255,9 +262,15 @@ protected CheckpointableKeyedStateBackend restoreKeyedBackend( keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), state, - new CloseableRegistry())); + new CloseableRegistry(), + 1.0d)); + } + + protected MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); } @TestTemplate @@ -283,9 +296,11 @@ void testEnableStateLatencyTracking() throws Exception { groupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), + getMetricGroup(), + getCustomInitializationMetrics(), Collections.emptyList(), - cancelStreamRegistry)); + cancelStreamRegistry, + 1.0d)); try { KeyedStateBackend nested = keyedStateBackend instanceof TestableKeyedStateBackend diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java index 5211e951e43ce..909c3c6170480 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java @@ -123,4 +123,9 @@ public void testMaterializedRestorePriorityQueue() throws Exception { ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue( getStateBackend(), env, streamFactory); } + + @Override + protected boolean checkMetrics() { + return false; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml index 98aa501b1aa02..1a39fc84b7f63 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml +++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml @@ -19,8 +19,8 @@ under the License. --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -102,11 +102,18 @@ under the License. - **/org/apache/flink/contrib/streaming/state/RocksDBTestUtils* - **/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest* + + **/org/apache/flink/contrib/streaming/state/RocksDBTestUtils* + + + **/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest* + - **/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest* - **/org/apache/flink/contrib/streaming/state/benchmark/* + + **/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest* + + **/org/apache/flink/contrib/streaming/state/benchmark/* + META-INF/LICENSE META-INF/NOTICE diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 6d15c53c1a287..0c5e748772e02 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -70,7 +70,9 @@ import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; import static org.apache.flink.util.Preconditions.checkArgument; @@ -168,7 +170,19 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke * The threshold of the overlap fraction between the handle's key-group range and target * key-group range. */ - private double overlapFractionThreshold; + private final double overlapFractionThreshold; + + /** + * Whether we use the optimized Ingest/Clip DB method for rescaling RocksDB incremental + * checkpoints. + */ + private final TernaryBoolean useIngestDbRestoreMode; + + /** + * Whether we trigger an async compaction after restores for which we detect state in the + * database (including tombstones) that exceed the proclaimed key-groups range of the backend. + */ + private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale; /** Factory for Write Buffer Manager and Block Cache. */ private RocksDBMemoryFactory rocksDBMemoryFactory; @@ -202,6 +216,8 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT; this.priorityQueueConfig = new RocksDBPriorityQueueConfig(); + this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; + this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED; } /** @@ -296,6 +312,17 @@ private EmbeddedRocksDBStateBackend( overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, "Overlap fraction threshold of restoring should be between 0 and 1"); + incrementalRestoreAsyncCompactAfterRescale = + original.incrementalRestoreAsyncCompactAfterRescale == TernaryBoolean.UNDEFINED + ? TernaryBoolean.fromBoxedBoolean( + config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE)) + : original.incrementalRestoreAsyncCompactAfterRescale; + + useIngestDbRestoreMode = + original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED + ? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE)) + : TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode()); + this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } @@ -466,7 +493,10 @@ public AbstractKeyedStateBackend createKeyedStateBackend( .setNativeMetricOptions( resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) .setWriteBatchSize(getWriteBatchSize()) - .setOverlapFractionThreshold(getOverlapFractionThreshold()); + .setOverlapFractionThreshold(getOverlapFractionThreshold()) + .setIncrementalRestoreAsyncCompactAfterRescale( + getIncrementalRestoreAsyncCompactAfterRescale()) + .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()); return builder.build(); } @@ -805,6 +835,15 @@ public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory) { : overlapFractionThreshold; } + boolean getIncrementalRestoreAsyncCompactAfterRescale() { + return incrementalRestoreAsyncCompactAfterRescale.getOrDefault( + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue()); + } + + boolean getUseIngestDbRestoreMode() { + return useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue()); + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 5795e72909d00..8acf8af11fa5d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -309,6 +309,21 @@ public class RocksDBConfigurableOptions implements Serializable { + "has a chance to be an initial handle. " + "The default value is 0.0, there is always a handle will be selected for initialization. "); + public static final ConfigOption USE_INGEST_DB_RESTORE_MODE = + key("state.backend.rocksdb.use-ingest-db-restore-mode") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys" + + " in the SST files does not exceed the declared key-group range."); + + public static final ConfigOption INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE = + key("state.backend.rocksdb.incremental-restore-async-compact-after-rescale") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend."); + static final ConfigOption[] CANDIDATE_CONFIGS = new ConfigOption[] { // configurable DBOptions @@ -334,7 +349,9 @@ public class RocksDBConfigurableOptions implements Serializable { USE_BLOOM_FILTER, BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_BLOCK_BASED_MODE, - RESTORE_OVERLAP_FRACTION_THRESHOLD + RESTORE_OVERLAP_FRACTION_THRESHOLD, + USE_INGEST_DB_RESTORE_MODE, + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE }; private static final Set> POSITIVE_INT_CONFIG_SET = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index a835d10c481a8..99b97ef516459 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -20,8 +20,12 @@ import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.util.function.RunnableWithException; + +import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -31,12 +35,15 @@ import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.Optional; /** Utils for RocksDB Incremental Checkpoint. */ public class RocksDBIncrementalCheckpointUtils { + /** - * Evaluates state handle's "score" regarding to the target range when choosing the best state + * Evaluates state handle's "score" regarding the target range when choosing the best state * handle to init the initial db for recovery, if the overlap fraction is less than * overlapFractionThreshold, then just return {@code Score.MIN} to mean the handle has no chance * to be the initial handle. @@ -85,9 +92,11 @@ public double getOverlapFraction() { } @Override - public int compareTo(@Nonnull Score other) { - return Comparator.comparing(Score::getIntersectGroupRange) - .thenComparing(Score::getOverlapFraction) + public int compareTo(@Nullable Score other) { + return Comparator.nullsFirst( + Comparator.comparing(Score::getIntersectGroupRange) + .thenComparing(Score::getIntersectGroupRange) + .thenComparing(Score::getOverlapFraction)) .compare(this, other); } } @@ -154,6 +163,194 @@ private static void deleteRange( } } + /** + * Returns true, if all entries in the sst files of the given DB is strictly within the expected + * key-group range for the DB. + * + * @param db the DB to check. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + */ + public static boolean isSstDataInKeyGroupRange( + RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) { + return checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange) + .allInRange(); + } + + /** + * Returns a range compaction task as runnable if any data in the SST files of the given DB + * exceeds the proclaimed key-group range. + * + * @param db the DB to check and compact if needed. + * @param columnFamilyHandles list of column families to check. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @return runnable that performs compaction upon execution if the key-groups range is exceeded. + * Otherwise, empty optional is returned. + */ + public static Optional createRangeCompactionTaskIfNeeded( + RocksDB db, + Collection columnFamilyHandles, + int keyGroupPrefixBytes, + KeyGroupRange dbExpectedKeyGroupRange) { + + RangeCheckResult rangeCheckResult = + checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange); + + if (rangeCheckResult.allInRange()) { + // No keys exceed the proclaimed range of the backend, so we don't need a compaction + // from this point of view. + return Optional.empty(); + } + + return Optional.of( + () -> { + /* + try (CompactRangeOptions compactionOptions = + new CompactRangeOptions() + .setExclusiveManualCompaction(true) + .setBottommostLevelCompaction( + CompactRangeOptions.BottommostLevelCompaction + .kForceOptimized)) { + + if (!rangeCheckResult.leftInRange) { + // Compact all keys before from the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + db.compactRange( + columnFamilyHandle, + // TODO: change to null once this API is fixed + new byte[] {}, + rangeCheckResult.minKey, + compactionOptions); + } + } + + if (!rangeCheckResult.rightInRange) { + // Compact all keys after the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + db.compactRange( + columnFamilyHandle, + rangeCheckResult.maxKey, + // TODO: change to null once this API is fixed + new byte[] { + (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff + }, + compactionOptions); + } + } + } + */ + }); + } + + /** + * Checks data in the SST files of the given DB for keys that exceed either the lower and upper + * bound of the proclaimed key-groups range of the DB. + * + * @param db the DB to check. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @return the check result with detailed info about lower and upper bound violations. + */ + private static RangeCheckResult checkSstDataAgainstKeyGroupRange( + RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) { + final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); + + KeyRange dbKeyRange = getDBKeyRange(db); + Comparator comparator = UnsignedBytes.lexicographicalComparator(); + return RangeCheckResult.of( + comparator.compare(dbKeyRange.minKey, beginKeyGroupBytes) >= 0, + comparator.compare(dbKeyRange.maxKey, endKeyGroupBytes) < 0, + beginKeyGroupBytes, + endKeyGroupBytes); + } + + /** Returns a pair of minimum and maximum key in the sst files of the given database. */ + private static KeyRange getDBKeyRange(RocksDB db) { + final Comparator comparator = UnsignedBytes.lexicographicalComparator(); + final List liveFilesMetaData = db.getLiveFilesMetaData(); + + if (liveFilesMetaData.isEmpty()) { + return KeyRange.EMPTY; + } + + Iterator liveFileMetaDataIterator = liveFilesMetaData.iterator(); + LiveFileMetaData fileMetaData = liveFileMetaDataIterator.next(); + byte[] smallestKey = fileMetaData.smallestKey(); + byte[] largestKey = fileMetaData.largestKey(); + while (liveFileMetaDataIterator.hasNext()) { + fileMetaData = liveFileMetaDataIterator.next(); + byte[] sstSmallestKey = fileMetaData.smallestKey(); + byte[] sstLargestKey = fileMetaData.largestKey(); + if (comparator.compare(sstSmallestKey, smallestKey) < 0) { + smallestKey = sstSmallestKey; + } + if (comparator.compare(sstLargestKey, largestKey) > 0) { + largestKey = sstLargestKey; + } + } + return KeyRange.of(smallestKey, largestKey); + } + + /** + * Exports the data of the given column families in the given DB. + * + * @param db the DB to export from. + * @param columnFamilyHandles the column families to export. + * @param registeredStateMetaInfoBases meta information about the registered states in the DB. + * @param exportBasePath the path to which the export files go. + * @param resultOutput output parameter for the metadata of the export. + * @throws RocksDBException on problems inside RocksDB. + */ + /* + public static void exportColumnFamilies( + RocksDB db, + List columnFamilyHandles, + List registeredStateMetaInfoBases, + Path exportBasePath, + Map> resultOutput) + throws RocksDBException { + + Preconditions.checkArgument( + columnFamilyHandles.size() == registeredStateMetaInfoBases.size(), + "Lists are aligned by index and must be of the same size!"); + + try (final Checkpoint checkpoint = Checkpoint.create(db)) { + for (int i = 0; i < columnFamilyHandles.size(); i++) { + RegisteredStateMetaInfoBase stateMetaInfo = registeredStateMetaInfoBases.get(i); + + Path subPath = exportBasePath.resolve(UUID.randomUUID().toString()); + ExportImportFilesMetaData exportedColumnFamilyMetaData = + checkpoint.exportColumnFamily( + columnFamilyHandles.get(i), subPath.toString()); + + File[] exportedSstFiles = + subPath.toFile() + .listFiles((file, name) -> name.toLowerCase().endsWith(".sst")); + + if (exportedSstFiles != null && exportedSstFiles.length > 0) { + resultOutput + .computeIfAbsent(stateMetaInfo, (key) -> new ArrayList<>()) + .add(exportedColumnFamilyMetaData); + } else { + // Close unused empty export result + IOUtils.closeQuietly(exportedColumnFamilyMetaData); + } + } + } + } + */ + /** check whether the bytes is before prefixBytes in the character order. */ public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) { final int prefixLength = prefixBytes.length; @@ -172,26 +369,107 @@ public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[ * * @param restoreStateHandles The candidate state handles. * @param targetKeyGroupRange The target key group range. + * @param overlapFractionThreshold configured threshold for overlap. * @return The best candidate or null if no candidate was a good fit. + * @param the generic parameter type of the state handles. */ @Nullable public static T chooseTheBestStateHandleForInitial( - @Nonnull Collection restoreStateHandles, + @Nonnull List restoreStateHandles, + @Nonnull KeyGroupRange targetKeyGroupRange, + double overlapFractionThreshold) { + + int pos = + findTheBestStateHandleForInitial( + restoreStateHandles, targetKeyGroupRange, overlapFractionThreshold); + return pos >= 0 ? restoreStateHandles.get(pos) : null; + } + + /** + * Choose the best state handle according to the {@link #stateHandleEvaluator(KeyedStateHandle, + * KeyGroupRange, double)} to init the initial db from the given lists and returns its index. + * + * @param restoreStateHandles The candidate state handles. + * @param targetKeyGroupRange The target key group range. + * @param overlapFractionThreshold configured threshold for overlap. + * @return the index of the best candidate handle in the list or -1 if the list was empty. + * @param the generic parameter type of the state handles. + */ + public static int findTheBestStateHandleForInitial( + @Nonnull List restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) { - T bestStateHandle = null; + if (restoreStateHandles.isEmpty()) { + return -1; + } + + // Shortcut for a common case (scale out) + if (restoreStateHandles.size() == 1) { + return 0; + } + + int currentPos = 0; + int bestHandlePos = 0; Score bestScore = Score.MIN; for (T rawStateHandle : restoreStateHandles) { Score handleScore = stateHandleEvaluator( rawStateHandle, targetKeyGroupRange, overlapFractionThreshold); - if (bestStateHandle == null || handleScore.compareTo(bestScore) > 0) { - bestStateHandle = rawStateHandle; + if (handleScore.compareTo(bestScore) > 0) { + bestHandlePos = currentPos; bestScore = handleScore; } + ++currentPos; } + return bestHandlePos; + } + + /** Helper class tha defines a key-range in RocksDB as byte arrays for min and max key. */ + private static final class KeyRange { + static final KeyRange EMPTY = KeyRange.of(new byte[0], new byte[0]); - return bestStateHandle; + final byte[] minKey; + final byte[] maxKey; + + private KeyRange(byte[] minKey, byte[] maxKey) { + this.minKey = minKey; + this.maxKey = maxKey; + } + + static KeyRange of(byte[] minKey, byte[] maxKey) { + return new KeyRange(minKey, maxKey); + } + } + + /** + * Helper class that represents the result of a range check of the actual keys in a RocksDB + * instance against the proclaimed key-group range of the instance. In short, this checks if the + * instance contains any keys (or tombstones for keys) that don't belong in the instance's + * key-groups range. + */ + private static final class RangeCheckResult { + private final byte[] minKey; + + private final byte[] maxKey; + final boolean leftInRange; + final boolean rightInRange; + + private RangeCheckResult( + boolean leftInRange, boolean rightInRange, byte[] minKey, byte[] maxKey) { + this.leftInRange = leftInRange; + this.rightInRange = rightInRange; + this.minKey = minKey; + this.maxKey = maxKey; + } + + boolean allInRange() { + return leftInRange && rightInRange; + } + + static RangeCheckResult of( + boolean leftInRange, boolean rightInRange, byte[] minKey, byte[] maxKey) { + return new RangeCheckResult(leftInRange, rightInRange, minKey, maxKey); + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index c91b63963f688..8e531ecebf86f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -80,6 +80,7 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -88,8 +89,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RunnableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -259,6 +262,8 @@ IS updateState( private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager; + @Nullable private final CompletableFuture asyncCompactAfterRestoreFuture; + public RocksDBKeyedStateBackend( ClassLoader userCodeClassLoader, File instanceBasePath, @@ -284,7 +289,8 @@ public RocksDBKeyedStateBackend( PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext keyContext, - @Nonnegative long writeBatchSize) { + @Nonnegative long writeBatchSize, + @Nullable CompletableFuture asyncCompactFuture) { super( kvStateRegistry, @@ -321,6 +327,7 @@ public RocksDBKeyedStateBackend( this.nativeMetricMonitor = nativeMetricMonitor; this.sharedRocksKeyBuilder = sharedRocksKeyBuilder; this.priorityQueueFactory = priorityQueueFactory; + this.asyncCompactAfterRestoreFuture = asyncCompactFuture; if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) { this.heapPriorityQueuesManager = new HeapPriorityQueuesManager( @@ -994,4 +1001,8 @@ public void compactState(StateDescriptor stateDesc) throws RocksDBExceptio long getWriteBatchSize() { return writeBatchSize; } + + public Optional> getAsyncCompactAfterRestoreFuture() { + return Optional.ofNullable(asyncCompactAfterRestoreFuture); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 3f67a8b5f85e9..0d0b8e63992b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -74,9 +75,12 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -124,7 +128,10 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes(); private RocksDB injectedTestDB; // for testing + private boolean incrementalRestoreAsyncCompactAfterRescale = + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue(); private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue(); + private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing private RocksDBStateUploader injectRocksDBStateUploader; // for testing @@ -269,6 +276,18 @@ RocksDBKeyedStateBackendBuilder setOverlapFractionThreshold( return this; } + RocksDBKeyedStateBackendBuilder setIncrementalRestoreAsyncCompactAfterRescale( + boolean incrementalRestoreAsyncCompactAfterRescale) { + this.incrementalRestoreAsyncCompactAfterRescale = + incrementalRestoreAsyncCompactAfterRescale; + return this; + } + + RocksDBKeyedStateBackendBuilder setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) { + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + return this; + } + public static File getInstanceRocksDBPath(File instanceBasePath) { return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); } @@ -296,6 +315,7 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { new LinkedHashMap<>(); RocksDB db = null; RocksDBRestoreOperation restoreOperation = null; + CompletableFuture asyncCompactAfterRestoreFuture = null; RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = new RocksDbTtlCompactFiltersManager(ttlTimeProvider); @@ -334,6 +354,8 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); nativeMetricMonitor = restoreResult.getNativeMetricMonitor(); + asyncCompactAfterRestoreFuture = + restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null); if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) { backendUID = restoreResult.getBackendUID(); materializedSstFiles = restoreResult.getRestoredSstFiles(); @@ -439,7 +461,8 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { priorityQueueFactory, ttlCompactFiltersManager, keyContext, - writeBatchSize); + writeBatchSize, + asyncCompactAfterRestoreFuture); } private RocksDBRestoreOperation getRocksDBRestoreOperation( @@ -449,7 +472,7 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation( LinkedHashMap> registeredPQStates, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { DBOptions dbOptions = optionsContainer.getDbOptions(); - if (restoreStateHandles.isEmpty()) { + if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new RocksDBNoneRestoreOperation<>( kvStateInformation, instanceRocksDBPath, @@ -478,11 +501,14 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation( nativeMetricOptions, metricGroup, customInitializationMetrics, - restoreStateHandles, + CollectionUtil.checkedSubTypeCast( + restoreStateHandles, IncrementalKeyedStateHandle.class), ttlCompactFiltersManager, writeBatchSize, optionsContainer.getWriteBufferManagerCapacity(), - overlapFractionThreshold); + overlapFractionThreshold, + useIngestDbRestoreMode, + incrementalRestoreAsyncCompactAfterRescale); } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 32f64022a16e9..c3a9549ccd8eb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -131,8 +131,16 @@ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( columnFamilyOptionsFactory, ttlCompactFiltersManager, writeBufferManagerCapacity); - return new RocksDBKeyedStateBackend.RocksDbKvStateInfo( - createColumnFamily(columnFamilyDescriptor, db), metaInfoBase); + + final ColumnFamilyHandle columnFamilyHandle; + try { + columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); + } catch (Exception ex) { + IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); + } + + return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase); } /** @@ -146,15 +154,17 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor( @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nullable Long writeBufferManagerCapacity) { + byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState( + !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); + if (ttlCompactFiltersManager != null) { ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); } - byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState( - !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), - "The chosen state name 'default' collides with the name of the default column family!"); if (writeBufferManagerCapacity != null) { // It'd be great to perform the check earlier, e.g. when creating write buffer manager. @@ -181,8 +191,7 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor( * @return true if sanity check passes, false otherwise */ static boolean sanityCheckArenaBlockSize( - long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity) - throws IllegalStateException { + long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity) { long defaultArenaBlockSize = RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize); @@ -221,13 +230,8 @@ public static ColumnFamilyOptions createColumnFamilyOptions( } private static ColumnFamilyHandle createColumnFamily( - ColumnFamilyDescriptor columnDescriptor, RocksDB db) { - try { - return db.createColumnFamily(columnDescriptor); - } catch (RocksDBException e) { - IOUtils.closeQuietly(columnDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); - } + ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws RocksDBException { + return db.createColumnFamily(columnDescriptor); } public static void addColumnFamilyOptionsToCloseLater( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index af1694c66e79d..850bda6bad76e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -19,18 +19,14 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; -import org.apache.flink.shaded.guava31.com.google.common.collect.Streams; - import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; @@ -41,17 +37,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; - /** Help class for downloading RocksDB state files. */ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { - private final CustomInitializationMetrics customInitializationMetrics; - - public RocksDBStateDownloader( - int restoringThreadNum, CustomInitializationMetrics customInitializationMetrics) { + public RocksDBStateDownloader(int restoringThreadNum) { super(restoringThreadNum); - this.customInitializationMetrics = customInitializationMetrics; } /** @@ -70,15 +60,11 @@ public void transferAllStateDataToDirectory( // Make sure we also react to external close signals. closeableRegistry.registerCloseable(internalCloser); try { - long startTimeMs = SystemClock.getInstance().relativeTimeMillis(); List> futures = transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. FutureUtils.completeAll(futures).get(); - customInitializationMetrics.addMetric( - DOWNLOAD_STATE_DURATION, - SystemClock.getInstance().relativeTimeMillis() - startTimeMs); } catch (Exception e) { downloadRequests.stream() .map(StateHandleDownloadSpec::getDownloadDestination) @@ -108,7 +94,7 @@ private Stream> transferAllStateDataToDirectoryAsync( .flatMap( downloadRequest -> // Take all files from shared and private state. - Streams.concat( + Stream.concat( downloadRequest.getStateHandle().getSharedState() .stream(), downloadRequest.getStateHandle().getPrivateState() diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java index 4005adde5655c..228ba065e1175 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java @@ -108,6 +108,7 @@ public RocksDBRestoreResult restore() this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java index 6c1d4625fb65e..25fbb95bc79ca 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java @@ -185,6 +185,35 @@ RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle( return registeredStateMetaInfoEntry; } + /** + * Registers a new column family and imports data from the given export. + * + * @param stateMetaInfo info about the state to create. + * @param cfMetaDataList the data to import. + */ + /* + void registerStateColumnFamilyHandleWithImport( + RegisteredStateMetaInfoBase stateMetaInfo, + List cfMetaDataList) { + + Preconditions.checkState(!kvStateInformation.containsKey(stateMetaInfo.getName())); + + RocksDbKvStateInfo stateInfo = + RocksDBOperationUtils.createStateInfo( + stateMetaInfo, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + cfMetaDataList); + + RocksDBOperationUtils.registerKvStateInformation( + kvStateInformation, nativeMetricMonitor, stateMetaInfo.getName(), stateInfo); + + columnFamilyHandles.add(stateInfo.columnFamilyHandle); + } + */ + /** * This recreates the new working directory of the recovered RocksDB instance and links/copies * the contents from a local state. diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java index 0c859e88f6b8d..649aa572a62e6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java @@ -138,6 +138,7 @@ public RocksDBRestoreResult restore() this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index faac6bee9bf39..c96d5d6422445 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics; import org.apache.flink.runtime.state.StateSerializerProvider; @@ -51,6 +50,9 @@ import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; +import org.apache.flink.util.clock.SystemClock; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.RunnableWithException; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -71,15 +73,20 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; +import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; +import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION; +import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_STATE_DURATION; import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException; /** Encapsulates the process of restoring a RocksDB instance from an incremental snapshot. */ @@ -88,10 +95,18 @@ public class RocksDBIncrementalRestoreOperation implements RocksDBRestoreOper private static final Logger logger = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class); + @SuppressWarnings("unchecked") + private static final Class[] + EXPECTED_STATE_HANDLE_CLASSES = + new Class[] { + IncrementalRemoteKeyedStateHandle.class, + IncrementalLocalKeyedStateHandle.class + }; + private final String operatorIdentifier; private final SortedMap> restoredSstFiles; private final RocksDBHandle rocksHandle; - private final Collection restoreStateHandles; + private final Collection restoreStateHandles; private final CloseableRegistry cancelStreamRegistry; private final KeyGroupRange keyGroupRange; private final File instanceBasePath; @@ -107,6 +122,10 @@ public class RocksDBIncrementalRestoreOperation implements RocksDBRestoreOper private boolean isKeySerializerCompatibilityChecked; + private final boolean useIngestDbRestoreMode; + + private final boolean asyncCompactAfterRescale; + public RocksDBIncrementalRestoreOperation( String operatorIdentifier, KeyGroupRange keyGroupRange, @@ -123,11 +142,13 @@ public RocksDBIncrementalRestoreOperation( RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, CustomInitializationMetrics customInitializationMetrics, - @Nonnull Collection restoreStateHandles, + @Nonnull Collection restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, - double overlapFractionThreshold) { + double overlapFractionThreshold, + boolean useIngestDbRestoreMode, + boolean asyncCompactAfterRescale) { this.rocksHandle = new RocksDBHandle( kvStateInformation, @@ -153,9 +174,16 @@ public RocksDBIncrementalRestoreOperation( this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keySerializerProvider = keySerializerProvider; this.userCodeClassLoader = userCodeClassLoader; + // this.useIngestDbRestoreMode = useIngestDbRestoreMode; + // this.asyncCompactAfterRescale = asyncCompactAfterRescale; + this.useIngestDbRestoreMode = false; + this.asyncCompactAfterRescale = false; } - /** Root method that branches for different implementations of {@link KeyedStateHandle}. */ + /** + * Root method that branches for different implementations of {@link + * IncrementalKeyedStateHandle}. + */ @Override public RocksDBRestoreResult restore() throws Exception { @@ -163,53 +191,484 @@ public RocksDBRestoreResult restore() throws Exception { return null; } - final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next(); + logger.info( + "Starting RocksDB incremental recovery in operator {} " + + "target key-group range {}. Use IngestDB={}, Use AsyncCompaction={}, State Handles={}", + operatorIdentifier, + keyGroupRange.prettyPrintInterval(), + useIngestDbRestoreMode, + asyncCompactAfterRescale, + restoreStateHandles); - boolean isRescaling = - (restoreStateHandles.size() > 1 - || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange)); + final List allDownloadSpecs = + new ArrayList<>(restoreStateHandles.size()); + + final List localKeyedStateHandles = + new ArrayList<>(restoreStateHandles.size()); - if (isRescaling) { - restoreWithRescaling(restoreStateHandles); + final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); + + try { + runAndReportDuration( + () -> + makeAllStateHandlesLocal( + absolutInstanceBasePath, + localKeyedStateHandles, + allDownloadSpecs), + DOWNLOAD_STATE_DURATION); + + runAndReportDuration( + () -> restoreFromLocalState(localKeyedStateHandles), RESTORE_STATE_DURATION); + + CompletableFuture asyncCompactFuture = null; + if (asyncCompactAfterRescale) { + asyncCompactFuture = + RocksDBIncrementalCheckpointUtils.createRangeCompactionTaskIfNeeded( + rocksHandle.getDb(), + rocksHandle.getColumnFamilyHandles(), + keyGroupPrefixBytes, + keyGroupRange) + .map( + (run) -> { + RunnableWithException runWithLogging = + () -> { + long t = System.currentTimeMillis(); + logger.info( + "Starting async compaction after restore for backend {} in operator {}", + backendUID, + operatorIdentifier); + try { + runAndReportDuration( + run, + RESTORE_ASYNC_COMPACTION_DURATION); + logger.info( + "Completed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t); + } catch (Exception ex) { + logger.info( + "Failed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t, + ex); + throw ex; + } + }; + ExecutorService executorService = + Executors.newSingleThreadExecutor(); + CompletableFuture resultFuture = + FutureUtils.runAsync( + runWithLogging, executorService); + executorService.shutdown(); + return resultFuture; + }) + .orElse(null); + logger.info( + "Finished RocksDB incremental recovery in operator {} with " + + "target key-group range range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); + } + + return new RocksDBRestoreResult( + this.rocksHandle.getDb(), + this.rocksHandle.getDefaultColumnFamilyHandle(), + this.rocksHandle.getNativeMetricMonitor(), + lastCompletedCheckpointId, + backendUID, + restoredSstFiles, + asyncCompactFuture); + } finally { + // Cleanup all download directories + allDownloadSpecs.stream() + .map(StateHandleDownloadSpec::getDownloadDestination) + .forEach(this::cleanUpPathQuietly); + } + } + + private void restoreFromLocalState( + List localKeyedStateHandles) throws Exception { + if (localKeyedStateHandles.size() == 1) { + // This happens if we don't rescale and for some scale out scenarios. + initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0)); } else { - restoreWithoutRescaling(theFirstStateHandle); + // This happens for all scale ins and some scale outs. + restoreFromMultipleStateHandles(localKeyedStateHandles); } - return new RocksDBRestoreResult( - this.rocksHandle.getDb(), - this.rocksHandle.getDefaultColumnFamilyHandle(), - this.rocksHandle.getNativeMetricMonitor(), - lastCompletedCheckpointId, - backendUID, - restoredSstFiles); } - /** Recovery from a single remote incremental state without rescaling. */ - @SuppressWarnings("unchecked") - private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception { + /** + * Downloads and converts all {@link IncrementalRemoteKeyedStateHandle}s to {@link + * IncrementalLocalKeyedStateHandle}s. + * + * @param absolutInstanceBasePath the base path of the restoring DB instance as absolute path. + * @param localKeyedStateHandlesOut the output parameter for the created {@link + * IncrementalLocalKeyedStateHandle}s. + * @param allDownloadSpecsOut output parameter for the created download specs. + * @throws Exception if an unexpected state handle type is passed as argument. + */ + private void makeAllStateHandlesLocal( + Path absolutInstanceBasePath, + List localKeyedStateHandlesOut, + List allDownloadSpecsOut) + throws Exception { + // Prepare and collect all the download request to pull remote state to a local directory + for (IncrementalKeyedStateHandle stateHandle : restoreStateHandles) { + if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { + StateHandleDownloadSpec downloadRequest = + new StateHandleDownloadSpec( + (IncrementalRemoteKeyedStateHandle) stateHandle, + absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); + allDownloadSpecsOut.add(downloadRequest); + } else if (stateHandle instanceof IncrementalLocalKeyedStateHandle) { + localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle) stateHandle); + } else { + throw unexpectedStateHandleException( + EXPECTED_STATE_HANDLE_CLASSES, stateHandle.getClass()); + } + } + + allDownloadSpecsOut.stream() + .map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState) + .forEach(localKeyedStateHandlesOut::add); + + transferRemoteStateToLocalDirectory(allDownloadSpecsOut); + } + + /** + * Initializes the base DB that we restore from a single local state handle. + * + * @param stateHandle the state handle to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void initBaseDBFromSingleStateHandle(IncrementalLocalKeyedStateHandle stateHandle) + throws Exception { + logger.info( - "Starting to restore from state handle: {} without rescaling.", keyedStateHandle); - if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) { - IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = - (IncrementalRemoteKeyedStateHandle) keyedStateHandle; - restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle); - restoreBaseDBFromRemoteState(incrementalRemoteKeyedStateHandle); - } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) { - IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = - (IncrementalLocalKeyedStateHandle) keyedStateHandle; - restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle); - restoreBaseDBFromLocalState(incrementalLocalKeyedStateHandle); + "Starting opening base RocksDB instance in operator {} with target key-group range {} from state handle {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval(), + stateHandle); + + // Restore base DB from selected initial handle + restoreBaseDBFromLocalState(stateHandle); + + KeyGroupRange stateHandleKeyGroupRange = stateHandle.getKeyGroupRange(); + + // Check if the key-groups range has changed. + if (Objects.equals(stateHandleKeyGroupRange, keyGroupRange)) { + // This is the case if we didn't rescale, so we can restore all the info from the + // previous backend instance (backend id and incremental checkpoint history). + restorePreviousIncrementalFilesStatus(stateHandle); } else { - throw unexpectedStateHandleException( - new Class[] { - IncrementalRemoteKeyedStateHandle.class, - IncrementalLocalKeyedStateHandle.class - }, - keyedStateHandle.getClass()); + // If the key-groups don't match, this was a scale out, and we need to clip the + // key-groups range of the db to the target range for this backend. + try { + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + this.rocksHandle.getDb(), + this.rocksHandle.getColumnFamilyHandles(), + keyGroupRange, + stateHandleKeyGroupRange, + keyGroupPrefixBytes); + } catch (RocksDBException e) { + String errMsg = "Failed to clip DB after initialization."; + logger.error(errMsg, e); + throw new BackendBuildingException(errMsg, e); + } + } + logger.info( + "Finished opening base RocksDB instance in operator {} with target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); + } + + /** + * Initializes the base DB that we restore from a list of multiple local state handles. + * + * @param localKeyedStateHandles the list of state handles to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void restoreFromMultipleStateHandles( + List localKeyedStateHandles) throws Exception { + + logger.info( + "Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + localKeyedStateHandles, + useIngestDbRestoreMode); + + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + + byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + + if (useIngestDbRestoreMode) { + // Optimized path for merging multiple handles with Ingest/Clip + mergeStateHandlesWithClipAndIngest( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } else { + // Optimized path for single handle and legacy path for merging multiple handles. + mergeStateHandlesWithCopyFromTemporaryInstance( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } + + logger.info( + "Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + useIngestDbRestoreMode); + } + + /** + * Restores the base DB by merging multiple state handles into one. This method first checks if + * all data to import is in the expected key-groups range and then uses import/export. + * Otherwise, this method falls back to copying the data using a temporary DB. + * + * @param localKeyedStateHandles the list of state handles to restore the base DB from. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any restore error. + */ + private void mergeStateHandlesWithClipAndIngest( + List localKeyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + + /* + final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); + final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); + Files.createDirectories(exportCfBasePath); + + final Map> + exportedColumnFamilyMetaData = new HashMap<>(localKeyedStateHandles.size()); + + final List notImportableHandles = + new ArrayList<>(localKeyedStateHandles.size()); + + try { + + KeyGroupRange exportedSstKeyGroupsRange = + exportColumnFamiliesWithSstDataInKeyGroupsRange( + exportCfBasePath, + localKeyedStateHandles, + exportedColumnFamilyMetaData, + notImportableHandles); + + if (exportedColumnFamilyMetaData.isEmpty()) { + // Nothing coule be exported, so we fall back to + // #mergeStateHandlesWithCopyFromTemporaryInstance + mergeStateHandlesWithCopyFromTemporaryInstance( + notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } else { + // We initialize the base DB by importing all the exported data. + initBaseDBFromColumnFamilyImports( + exportedColumnFamilyMetaData, exportedSstKeyGroupsRange); + // Copy data from handles that we couldn't directly import using temporary + // instances. + copyToBaseDBUsingTempDBs( + notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } + } finally { + // Close native RocksDB objects + exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly); + // Cleanup export base directory + cleanUpPathQuietly(exportCfBasePath); + } + */ + } + + /** + * Prepares the data for importing by exporting from temporary RocksDB instances. We can only + * import data that does not exceed the target key-groups range and skip state handles that + * exceed their range. + * + * @param exportCfBasePath the base path for the export files. + * @param localKeyedStateHandles the state handles to prepare for import. + * @param exportedColumnFamiliesOut output parameter for the metadata of completed exports. + * @param skipped output parameter for state handles that could not be exported because the data + * exceeded the proclaimed range. + * @return the total key-groups range of the exported data. + * @throws Exception on any export error. + */ + /* + private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange( + Path exportCfBasePath, + List localKeyedStateHandles, + Map> + exportedColumnFamiliesOut, + List skipped) + throws Exception { + + logger.info( + "Starting restore export for backend with range {} in operator {}.", + keyGroupRange, + operatorIdentifier); + + int minExportKeyGroup = Integer.MAX_VALUE; + int maxExportKeyGroup = Integer.MIN_VALUE; + for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { + try (RestoredDBInstance tmpRestoreDBInfo = + restoreTempDBInstanceFromLocalState(stateHandle)) { + + List tmpColumnFamilyHandles = + tmpRestoreDBInfo.columnFamilyHandles; + + // Check if the data in all SST files referenced in the handle is within the + // proclaimed key-groups range of the handle. + if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange( + tmpRestoreDBInfo.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange())) { + + logger.debug( + "Exporting state handle {} for backend with range {} in operator {}.", + stateHandle, + keyGroupRange, + operatorIdentifier); + + List registeredStateMetaInfoBases = + tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() + .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) + .collect(Collectors.toList()); + + // Export all the Column Families and store the result in + // exportedColumnFamiliesOut + RocksDBIncrementalCheckpointUtils.exportColumnFamilies( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandles, + registeredStateMetaInfoBases, + exportCfBasePath, + exportedColumnFamiliesOut); + + minExportKeyGroup = + Math.min( + minExportKeyGroup, + stateHandle.getKeyGroupRange().getStartKeyGroup()); + maxExportKeyGroup = + Math.max( + maxExportKeyGroup, + stateHandle.getKeyGroupRange().getEndKeyGroup()); + + logger.debug( + "Done exporting state handle {} for backend with range {} in operator {}.", + stateHandle, + keyGroupRange, + operatorIdentifier); + } else { + // Actual key range in files exceeds proclaimed range, cannot import. We + // will copy this handle using a temporary DB later. + skipped.add(stateHandle); + } + } } + + logger.info( + "Completed restore export for backend with range {} in operator {}. Number of Exported handles: {}. Skipped handles: {}.", + keyGroupRange, + operatorIdentifier, + localKeyedStateHandles.size() - skipped.size(), + skipped); + + return minExportKeyGroup <= maxExportKeyGroup + ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) + : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; + } + */ + + /** + * Helper method that merges the data from multiple state handles into the restoring base DB by + * the help of copying through temporary RocksDB instances. + * + * @param localKeyedStateHandles the state handles to merge into the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any merge error. + */ + private void mergeStateHandlesWithCopyFromTemporaryInstance( + List localKeyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + logger.info( - "Finished restoring from state handle: {} without rescaling.", keyedStateHandle); + "Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + // Choose the best state handle for the initial DB + final IncrementalLocalKeyedStateHandle selectedInitialHandle = + localKeyedStateHandles.remove( + RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial( + localKeyedStateHandles, keyGroupRange, overlapFractionThreshold)); + + Preconditions.checkNotNull(selectedInitialHandle); + + // Remove the selected handle from the list so that we don't restore it twice. + localKeyedStateHandles.remove(selectedInitialHandle); + + // Init the base DB instance with the initial state + initBaseDBFromSingleStateHandle(selectedInitialHandle); + + // Copy remaining handles using temporary RocksDB instances + copyToBaseDBUsingTempDBs( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + + logger.info( + "Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } + + /** + * Initializes the base DB by importing from previously exported data. + * + * @param exportedColumnFamilyMetaData the export (meta) data. + * @param exportKeyGroupRange the total key-groups range of the exported data. + * @throws Exception on import error. + */ + /* + private void initBaseDBFromColumnFamilyImports( + Map> + exportedColumnFamilyMetaData, + KeyGroupRange exportKeyGroupRange) + throws Exception { + + // We initialize the base DB by importing all the exported data. + logger.info( + "Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", + keyGroupRange, + operatorIdentifier); + rocksHandle.openDB(); + exportedColumnFamilyMetaData.forEach( + rocksHandle::registerStateColumnFamilyHandleWithImport); + + // Use Range delete to clip the temp db to the target range of the backend + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + rocksHandle.getDb(), + rocksHandle.getColumnFamilyHandles(), + keyGroupRange, + exportKeyGroupRange, + keyGroupPrefixBytes); + + logger.info( + "Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", + keyGroupRange, + operatorIdentifier); } + */ + /** + * Restores the checkpointing status and state for this backend. This can only be done if the + * backend was not rescaled and is therefore identical to the source backend in the previous + * run. + * + * @param localKeyedStateHandle the single state handle from which the backend is restored. + */ private void restorePreviousIncrementalFilesStatus( IncrementalKeyedStateHandle localKeyedStateHandle) { backendUID = localKeyedStateHandle.getBackendIdentifier(); @@ -217,24 +676,20 @@ private void restorePreviousIncrementalFilesStatus( localKeyedStateHandle.getCheckpointId(), localKeyedStateHandle.getSharedStateHandles()); lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId(); + logger.info( + "Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier, + backendUID, + lastCompletedCheckpointId); } - private void restoreBaseDBFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) - throws Exception { - // used as restore source for IncrementalRemoteKeyedStateHandle - final Path tmpRestoreInstancePath = - instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString()); - final StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec(stateHandle, tmpRestoreInstancePath); - try { - transferRemoteStateToLocalDirectory(Collections.singletonList(downloadRequest)); - restoreBaseDBFromLocalState(downloadRequest.createLocalStateHandleForDownloadedState()); - } finally { - cleanUpPathQuietly(downloadRequest.getDownloadDestination()); - } - } - - /** Restores RocksDB instance from local state. */ + /** + * Restores the base DB from local state of a single state handle. + * + * @param localKeyedStateHandle the state handle tor estore from. + * @throws Exception on any restore error. + */ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { KeyedBackendSerializationProxy serializationProxy = @@ -244,185 +699,155 @@ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localK Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory(); - logger.debug( - "Restoring keyed backend uid in operator {} from incremental snapshot to {}.", - operatorIdentifier, - backendUID); - this.rocksHandle.openDB( createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, restoreSourcePath); } + /** + * Helper method to download files, as specified in the given download specs, to the local + * directory. + * + * @param downloadSpecs specifications of files to download. + * @throws Exception On any download error. + */ private void transferRemoteStateToLocalDirectory( - Collection downloadRequests) throws Exception { + Collection downloadSpecs) throws Exception { + logger.info( + "Start downloading remote state to local directory in operator {} for target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader( - numberOfTransferringThreads, customInitializationMetrics)) { + new RocksDBStateDownloader(numberOfTransferringThreads)) { rocksDBStateDownloader.transferAllStateDataToDirectory( - downloadRequests, cancelStreamRegistry); - } - } - - private void cleanUpPathQuietly(@Nonnull Path path) { - try { - FileUtils.deleteDirectory(path.toFile()); - } catch (IOException ex) { - logger.warn("Failed to clean up path " + path, ex); + downloadSpecs, cancelStreamRegistry); + logger.info( + "Finished downloading remote state to local directory in operator {} for target key-group range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); } } /** - * Recovery from multi incremental states with rescaling. For rescaling, this method creates a - * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance - * are copied into the real restore instance and then the temporary instance is discarded. + * Helper method to copy all data from the given local state handles to the base DB by using + * temporary DB instances. + * + * @param toImport the state handles to import. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. */ - private void restoreWithRescaling(Collection restoreStateHandles) + private void copyToBaseDBUsingTempDBs( + List toImport, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) throws Exception { - Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); - - final List allDownloadSpecs = new ArrayList<>(); - - final List localKeyedStateHandles = - new ArrayList<>(restoreStateHandles.size()); - - final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); + if (toImport.isEmpty()) { + return; + } - // Prepare and collect all the download request to pull remote state to a local directory - for (KeyedStateHandle stateHandle : restoreStateHandles) { - if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { - StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec( - (IncrementalRemoteKeyedStateHandle) stateHandle, - absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); - allDownloadSpecs.add(downloadRequest); - } else if (stateHandle instanceof IncrementalLocalKeyedStateHandle) { - localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) stateHandle); - } else { - throw unexpectedStateHandleException( - IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); + logger.info( + "Starting to copy state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + try (RocksDBWriteBatchWrapper writeBatchWrapper = + new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { + for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) { + try (RestoredDBInstance restoredDBInstance = + restoreTempDBInstanceFromLocalState(handleToCopy)) { + copyTempDbIntoBaseDb( + restoredDBInstance, + writeBatchWrapper, + startKeyGroupPrefixBytes, + stopKeyGroupPrefixBytes); + } } } - allDownloadSpecs.stream() - .map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState) - .forEach(localKeyedStateHandles::add); - - // Choose the best state handle for the initial DB - final IncrementalLocalKeyedStateHandle selectedInitialHandle = - RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( - localKeyedStateHandles, keyGroupRange, overlapFractionThreshold); - Preconditions.checkNotNull(selectedInitialHandle); - // Remove the selected handle from the list so that we don't restore it twice. - localKeyedStateHandles.remove(selectedInitialHandle); - - try { - // Process all state downloads - transferRemoteStateToLocalDirectory(allDownloadSpecs); - - // Init the base DB instance with the initial state - initBaseDBForRescaling(selectedInitialHandle); - - // Transfer remaining key-groups from temporary instance into base DB - byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + logger.info( + "Competed copying state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + } - byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + /** + * Helper method tp copy all data from an open temporary DB to the base DB. + * + * @param tmpRestoreDBInfo the temporary instance. + * @param writeBatchWrapper write batch wrapper for writes against the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. + */ + private void copyTempDbIntoBaseDb( + RestoredDBInstance tmpRestoreDBInfo, + RocksDBWriteBatchWrapper writeBatchWrapper, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { - // Insert all remaining state through creating temporary RocksDB instances - for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { - logger.info( - "Starting to restore from state handle: {} with rescaling.", stateHandle); - - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromLocalState(stateHandle); - RocksDBWriteBatchWrapper writeBatchWrapper = - new RocksDBWriteBatchWrapper( - this.rocksHandle.getDb(), writeBatchSize)) { - - List tmpColumnFamilyDescriptors = - tmpRestoreDBInfo.columnFamilyDescriptors; - List tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - // iterating only the requested descriptors automatically skips the default - // column - // family handle - for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { - ColumnFamilyHandle tmpColumnFamilyHandle = - tmpColumnFamilyHandles.get(descIdx); - - ColumnFamilyHandle targetColumnFamilyHandle = - this.rocksHandle.getOrRegisterStateColumnFamilyHandle( - null, - tmpRestoreDBInfo.stateMetaInfoSnapshots.get( - descIdx)) - .columnFamilyHandle; - - try (RocksIteratorWrapper iterator = - RocksDBOperationUtils.getRocksIterator( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandle, - tmpRestoreDBInfo.readOptions)) { - - iterator.seek(startKeyGroupPrefixBytes); - - while (iterator.isValid()) { - - if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( - iterator.key(), stopKeyGroupPrefixBytes)) { - writeBatchWrapper.put( - targetColumnFamilyHandle, - iterator.key(), - iterator.value()); - } else { - // Since the iterator will visit the record according to the - // sorted - // order, - // we can just break here. - break; - } - - iterator.next(); - } - } // releases native iterator resources + logger.debug( + "Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); + + List tmpColumnFamilyDescriptors = + tmpRestoreDBInfo.columnFamilyDescriptors; + List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + // iterating only the requested descriptors automatically skips the default + // column + // family handle + for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); + + ColumnFamilyHandle targetColumnFamilyHandle = + this.rocksHandle.getOrRegisterStateColumnFamilyHandle( + null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx)) + .columnFamilyHandle; + + try (RocksIteratorWrapper iterator = + RocksDBOperationUtils.getRocksIterator( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandle, + tmpRestoreDBInfo.readOptions)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( + iterator.key(), stopKeyGroupPrefixBytes)) { + writeBatchWrapper.put( + targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the + // sorted + // order, + // we can just break here. + break; } - logger.info( - "Finished restoring from state handle: {} with rescaling.", - stateHandle); + + iterator.next(); } - } - } finally { - // Cleanup all download directories - allDownloadSpecs.stream() - .map(StateHandleDownloadSpec::getDownloadDestination) - .forEach(this::cleanUpPathQuietly); + } // releases native iterator resources } + logger.debug( + "Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange.prettyPrintInterval(), + operatorIdentifier); } - private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle stateHandle) - throws Exception { - - // 1. Restore base DB from selected initial handle - restoreBaseDBFromLocalState(stateHandle); - - // 2. Clip the base DB instance + private void cleanUpPathQuietly(@Nonnull Path path) { try { - RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( - this.rocksHandle.getDb(), - this.rocksHandle.getColumnFamilyHandles(), - keyGroupRange, - stateHandle.getKeyGroupRange(), - keyGroupPrefixBytes); - } catch (RocksDBException e) { - String errMsg = "Failed to clip DB after initialization."; - logger.error(errMsg, e); - throw new BackendBuildingException(errMsg, e); + FileUtils.deleteDirectory(path.toFile()); + } catch (IOException ex) { + logger.warn("Failed to clean up path " + path, ex); } } @@ -441,17 +866,21 @@ private static class RestoredDBInstance implements AutoCloseable { private final ReadOptions readOptions; + private final IncrementalLocalKeyedStateHandle srcStateHandle; + private RestoredDBInstance( @Nonnull RocksDB db, @Nonnull List columnFamilyHandles, @Nonnull List columnFamilyDescriptors, - @Nonnull List stateMetaInfoSnapshots) { + @Nonnull List stateMetaInfoSnapshots, + @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) { this.db = db; this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); this.columnFamilyHandles = columnFamilyHandles; this.columnFamilyDescriptors = columnFamilyDescriptors; this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; this.readOptions = new ReadOptions(); + this.srcStateHandle = srcStateHandle; } @Override @@ -493,12 +922,16 @@ private RestoredDBInstance restoreTempDBInstanceFromLocalState( this.rocksHandle.getDbOptions()); return new RestoredDBInstance( - restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots); + restoreDb, + columnFamilyHandles, + columnFamilyDescriptors, + stateMetaInfoSnapshots, + stateHandle); } /** * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state - * meta data snapshot. + * metadata snapshot. */ private List createColumnFamilyDescriptors( List stateMetaInfoSnapshots, boolean registerTtlCompactFilter) { @@ -509,6 +942,7 @@ private List createColumnFamilyDescriptors( for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { RegisteredStateMetaInfoBase metaInfoBase = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); + ColumnFamilyDescriptor columnFamilyDescriptor = RocksDBOperationUtils.createColumnFamilyDescriptor( metaInfoBase, @@ -523,6 +957,14 @@ private List createColumnFamilyDescriptors( return columnFamilyDescriptors; } + private void runAndReportDuration(RunnableWithException runnable, String metricName) + throws Exception { + final SystemClock clock = SystemClock.getInstance(); + final long startTime = clock.relativeTimeMillis(); + runnable.run(); + customInitializationMetrics.addMetric(metricName, clock.relativeTimeMillis() - startTime); + } + /** Reads Flink's state meta data file from the state handle. */ private KeyedBackendSerializationProxy readMetaData(StreamStateHandle metaStateHandle) throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java index 4202c899c59a1..b12d6ff1d159b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java @@ -66,6 +66,7 @@ public RocksDBRestoreResult restore() throws Exception { this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java index ad17b6f27690d..1e458eb45f479 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java @@ -24,9 +24,13 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; +import javax.annotation.Nullable; + import java.util.Collection; +import java.util.Optional; import java.util.SortedMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** Entity holding result of RocksDB instance restore. */ public class RocksDBRestoreResult { @@ -39,19 +43,23 @@ public class RocksDBRestoreResult { private final UUID backendUID; private final SortedMap> restoredSstFiles; + private final CompletableFuture asyncCompactAfterRestoreFuture; + public RocksDBRestoreResult( RocksDB db, ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, long lastCompletedCheckpointId, UUID backendUID, - SortedMap> restoredSstFiles) { + SortedMap> restoredSstFiles, + @Nullable CompletableFuture asyncCompactAfterRestoreFuture) { this.db = db; this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; this.nativeMetricMonitor = nativeMetricMonitor; this.lastCompletedCheckpointId = lastCompletedCheckpointId; this.backendUID = backendUID; this.restoredSstFiles = restoredSstFiles; + this.asyncCompactAfterRestoreFuture = asyncCompactAfterRestoreFuture; } public RocksDB getDb() { @@ -77,4 +85,8 @@ public ColumnFamilyHandle getDefaultColumnFamilyHandle() { public RocksDBNativeMetricMonitor getNativeMetricMonitor() { return nativeMetricMonitor; } + + public Optional> getAsyncCompactAfterRestoreFuture() { + return Optional.ofNullable(asyncCompactAfterRestoreFuture); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 03b1a087e2874..5feeeaf5608dd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -24,22 +24,26 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.SharedStateRegistryKey; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -51,6 +55,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.function.SupplierWithException; @@ -80,12 +85,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Matchers.any; @@ -116,7 +123,14 @@ public static List modes() { { true, (SupplierWithException) - JobManagerCheckpointStorage::new + JobManagerCheckpointStorage::new, + false + }, + { + true, + (SupplierWithException) + JobManagerCheckpointStorage::new, + true }, { false, @@ -126,7 +140,8 @@ public static List modes() { TempDirUtils.newFolder(tempFolder).toURI().toString(); return new FileSystemCheckpointStorage( new Path(checkpointPath), 0, -1); - } + }, + false } }); } @@ -137,12 +152,16 @@ public static List modes() { @Parameter(value = 1) public SupplierWithException storageSupplier; + @Parameter(value = 2) + public boolean useIngestDB; + // Store it because we need it for the cleanup test. private String dbPath; private RocksDB db = null; private ColumnFamilyHandle defaultCFHandle = null; private RocksDBStateUploader rocksDBStateUploader = null; private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); + private final HashMap initMetricBackingMap = new HashMap<>(); public void prepareRocksDB() throws Exception { String dbPath = @@ -168,6 +187,7 @@ protected ConfigurableStateBackend getStateBackend() throws IOException { EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); Configuration configuration = new Configuration(); + configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB); configuration.set( RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); @@ -176,6 +196,14 @@ protected ConfigurableStateBackend getStateBackend() throws IOException { return backend; } + @Override + protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() { + return (name, value) -> { + initMetricBackingMap.compute( + name, (key, oldValue) -> oldValue == null ? value : value + oldValue); + }; + } + @Override protected CheckpointStorage getCheckpointStorage() throws Exception { return storageSupplier.get(); @@ -234,7 +262,8 @@ public void setupRocksKeyedStateBackend() throws Exception { spy(db), defaultCFHandle, optionsContainer.getColumnOptions()) - .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing); + .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) + .setUseIngestDbRestoreMode(useIngestDB); if (enableIncrementalCheckpointing) { rocksDBStateUploader = @@ -315,19 +344,18 @@ public Object answer(InvocationOnMock invocationOnMock) @TestTemplate public void testCorrectMergeOperatorSet() throws Exception { prepareRocksDB(); - final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); - RocksDBKeyedStateBackend test = null; - try { - test = - RocksDBTestUtils.builderForTestDB( - TempDirUtils.newFolder(tempFolder), - IntSerializer.INSTANCE, - db, - defaultCFHandle, - columnFamilyOptions) - .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) - .build(); + try (ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); + RocksDBKeyedStateBackend test = + RocksDBTestUtils.builderForTestDB( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + db, + defaultCFHandle, + columnFamilyOptions) + .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) + .setUseIngestDbRestoreMode(useIngestDB) + .build()) { ValueStateDescriptor stubState1 = new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE); @@ -339,12 +367,6 @@ public void testCorrectMergeOperatorSet() throws Exception { // The default CF is pre-created so sum up to 2 times (once for each stub state) verify(columnFamilyOptions, Mockito.times(2)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); - } finally { - if (test != null) { - IOUtils.closeQuietly(test); - test.dispose(); - } - columnFamilyOptions.close(); } } @@ -665,6 +687,29 @@ private void verifyRocksDBStateUploaderClosed() { } } + protected CheckpointableKeyedStateBackend restoreKeyedBackend( + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + List state, + Environment env) + throws Exception { + CheckpointableKeyedStateBackend restoreResult = + super.restoreKeyedBackend( + keySerializer, numberOfKeyGroups, keyGroupRange, state, env); + + // If something was restored, check that all expected metrics are present. + if (checkMetrics() && !CollectionUtil.isEmptyOrAllElementsNull(state)) { + assertThat(initMetricBackingMap.keySet()) + .containsExactlyInAnyOrder("RestoreStateDurationMs", "DownloadStateDurationMs"); + } + return restoreResult; + } + + protected boolean checkMetrics() { + return true; + } + private static class AcceptAllFilter implements IOFileFilter { @Override public boolean accept(File file) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java new file mode 100644 index 0000000000000..4a233ec06ba03 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; +import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.util.IOUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.RunnableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.core.fs.Path.fromLocalFile; +import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance; + +/** Rescaling test and microbenchmark for RocksDB. */ +public class RocksDBRecoveryTest { + + // Assign System.out for console output. + private static final PrintStream OUTPUT = + new PrintStream( + new OutputStream() { + @Override + public void write(int b) {} + }); + + @TempDir private static java.nio.file.Path tempFolder; + + @Test + public void testScaleOut_1_2() throws Exception { + testRescale(1, 2, 100_000, 10); + } + + @Test + public void testScaleOut_2_8() throws Exception { + testRescale(2, 8, 100_000, 10); + } + + @Test + public void testScaleOut_2_7() throws Exception { + testRescale(2, 7, 100_000, 10); + } + + @Test + public void testScaleIn_2_1() throws Exception { + testRescale(2, 1, 100_000, 10); + } + + @Test + public void testScaleIn_8_2() throws Exception { + testRescale(8, 2, 100_000, 10); + } + + @Test + public void testScaleIn_7_2() throws Exception { + testRescale(7, 2, 100_000, 10); + } + + @Test + public void testScaleIn_2_3() throws Exception { + testRescale(2, 3, 100_000, 10); + } + + @Test + public void testScaleIn_3_2() throws Exception { + testRescale(3, 2, 100_000, 10); + } + + public void testRescale( + int startParallelism, int targetParallelism, int numKeys, int updateDistance) + throws Exception { + + OUTPUT.println("Rescaling from " + startParallelism + " to " + targetParallelism + "..."); + final String stateName = "TestValueState"; + final int maxParallelism = startParallelism * targetParallelism; + final List> backends = new ArrayList<>(maxParallelism); + final List> startSnapshotResult = new ArrayList<>(); + final List> rescaleSnapshotResult = new ArrayList<>(); + final List> cleanupSnapshotResult = new ArrayList<>(); + try { + final List> valueStates = new ArrayList<>(maxParallelism); + try { + ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor<>(stateName, IntSerializer.INSTANCE); + + for (int i = 0; i < startParallelism; ++i) { + RocksDBKeyedStateBackend backend = + RocksDBTestUtils.builderForTestDefaults( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + maxParallelism, + KeyGroupRangeAssignment + .computeKeyGroupRangeForOperatorIndex( + maxParallelism, startParallelism, i), + Collections.emptyList()) + .setEnableIncrementalCheckpointing(true) + .setUseIngestDbRestoreMode(true) + .build(); + + valueStates.add( + backend.getOrCreateKeyedState( + VoidNamespaceSerializer.INSTANCE, stateDescriptor)); + + backends.add(backend); + } + + OUTPUT.println("Inserting " + numKeys + " keys..."); + + for (int i = 1; i <= numKeys; ++i) { + int key = i; + int index = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + key, maxParallelism, startParallelism); + backends.get(index).setCurrentKey(key); + valueStates.get(index).update(i); + + if (updateDistance > 0 && i % updateDistance == 0) { + key = i - updateDistance + 1; + index = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + key, maxParallelism, startParallelism); + backends.get(index).setCurrentKey(key); + valueStates.get(index).update(i); + } + } + + OUTPUT.println("Creating snapshots..."); + snapshotAllBackends(backends, startSnapshotResult); + } finally { + for (RocksDBKeyedStateBackend backend : backends) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + valueStates.clear(); + backends.clear(); + } + + for (boolean useIngest : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) { + for (boolean asyncCompact : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) { + + // Rescale start -> target + rescaleAndRestoreBackends( + useIngest, + asyncCompact, + targetParallelism, + maxParallelism, + startSnapshotResult, + backends); + + backends.forEach( + backend -> + backend.getAsyncCompactAfterRestoreFuture() + .ifPresent( + future -> { + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + + snapshotAllBackends(backends, rescaleSnapshotResult); + + int count = 0; + for (RocksDBKeyedStateBackend backend : backends) { + count += backend.getKeys(stateName, VoidNamespace.INSTANCE).count(); + IOUtils.closeQuietly(backend); + backend.dispose(); + } + Assertions.assertEquals(numKeys, count); + backends.clear(); + cleanupSnapshotResult.addAll(rescaleSnapshotResult); + + // Rescale reverse: target -> start + rescaleAndRestoreBackends( + useIngest, + false, + startParallelism, + maxParallelism, + rescaleSnapshotResult, + backends); + + count = 0; + for (RocksDBKeyedStateBackend backend : backends) { + count += backend.getKeys(stateName, VoidNamespace.INSTANCE).count(); + IOUtils.closeQuietly(backend); + backend.dispose(); + } + Assertions.assertEquals(numKeys, count); + rescaleSnapshotResult.clear(); + backends.clear(); + } + } + } finally { + for (RocksDBKeyedStateBackend backend : backends) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + for (SnapshotResult snapshotResult : startSnapshotResult) { + snapshotResult.discardState(); + } + for (SnapshotResult snapshotResult : rescaleSnapshotResult) { + snapshotResult.discardState(); + } + for (SnapshotResult snapshotResult : cleanupSnapshotResult) { + snapshotResult.discardState(); + } + } + } + + private void rescaleAndRestoreBackends( + boolean useIngest, + boolean asyncCompactAfterRescale, + int targetParallelism, + int maxParallelism, + List> snapshotResult, + List> backendsOut) + throws IOException { + + List stateHandles = + extractKeyedStateHandlesFromSnapshotResult(snapshotResult); + List ranges = computeKeyGroupRanges(targetParallelism, maxParallelism); + List> handlesByInstance = + computeHandlesByInstance(stateHandles, ranges, targetParallelism); + + OUTPUT.println( + "Restoring using ingestDb=" + + useIngest + + ", asyncCompact=" + + asyncCompactAfterRescale + + "... "); + + OUTPUT.println( + "Sum of snapshot sizes: " + + stateHandles.stream().mapToLong(StateObject::getStateSize).sum() + / (1024 * 1024) + + " MB"); + + long maxInstanceTime = Long.MIN_VALUE; + long t = System.currentTimeMillis(); + for (int i = 0; i < targetParallelism; ++i) { + List instanceHandles = handlesByInstance.get(i); + long tInstance = System.currentTimeMillis(); + RocksDBKeyedStateBackend backend = + RocksDBTestUtils.builderForTestDefaults( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + maxParallelism, + ranges.get(i), + instanceHandles) + .setEnableIncrementalCheckpointing(true) + .setUseIngestDbRestoreMode(useIngest) + .setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale) + .build(); + + long instanceTime = System.currentTimeMillis() - tInstance; + if (instanceTime > maxInstanceTime) { + maxInstanceTime = instanceTime; + } + + OUTPUT.println( + " Restored instance " + + i + + " from " + + instanceHandles.size() + + " state handles" + + " time (ms): " + + instanceTime); + + backendsOut.add(backend); + } + OUTPUT.println("Total restore time (ms): " + (System.currentTimeMillis() - t)); + OUTPUT.println("Max restore time (ms): " + maxInstanceTime); + } + + private void snapshotAllBackends( + List> backends, + List> snapshotResultsOut) + throws Exception { + for (int i = 0; i < backends.size(); ++i) { + RocksDBKeyedStateBackend backend = backends.get(i); + FsCheckpointStreamFactory fsCheckpointStreamFactory = + new FsCheckpointStreamFactory( + getSharedInstance(), + fromLocalFile( + TempDirUtils.newFolder( + tempFolder, "checkpointsDir_" + UUID.randomUUID() + i)), + fromLocalFile( + TempDirUtils.newFolder( + tempFolder, "sharedStateDir_" + UUID.randomUUID() + i)), + 1, + 4096); + + RunnableFuture> snapshot = + backend.snapshot( + 0L, + 0L, + fsCheckpointStreamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()); + + snapshot.run(); + snapshotResultsOut.add(snapshot.get()); + } + } + + private List extractKeyedStateHandlesFromSnapshotResult( + List> snapshotResults) { + return snapshotResults.stream() + .map(SnapshotResult::getJobManagerOwnedSnapshot) + .collect(Collectors.toList()); + } + + private List computeKeyGroupRanges(int restoreParallelism, int maxParallelism) { + List ranges = new ArrayList<>(restoreParallelism); + for (int i = 0; i < restoreParallelism; ++i) { + ranges.add( + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + maxParallelism, restoreParallelism, i)); + } + return ranges; + } + + private List> computeHandlesByInstance( + List stateHandles, + List computedRanges, + int restoreParallelism) { + List> handlesByInstance = new ArrayList<>(restoreParallelism); + for (KeyGroupRange targetRange : computedRanges) { + List handlesForTargetRange = new ArrayList<>(1); + handlesByInstance.add(handlesForTargetRange); + + for (KeyedStateHandle stateHandle : stateHandles) { + if (stateHandle.getKeyGroupRange().getIntersection(targetRange) + != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + handlesForTargetRange.add(stateHandle); + } + } + } + return handlesByInstance; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 34d27e1bb5915..d54210e727e1c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -872,6 +872,49 @@ public void testConfigureRestoreOverlapThreshold() { assertTrue(0.3 == rocksDBStateBackend.getOverlapFractionThreshold()); } + @Test + public void testDefaultUseIngestDB() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + assertEquals( + RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue(), + rocksDBStateBackend.getUseIngestDbRestoreMode()); + } + + @Test + public void testConfigureUseIngestDB() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + Configuration configuration = new Configuration(); + configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, true); + rocksDBStateBackend = + rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); + assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode()); + } + + @Test + public void testDefaultIncrementalRestoreInstanceBufferSize() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + assertEquals( + RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE + .defaultValue(), + rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale()); + } + + @Test + public void testConfigureIncrementalRestoreInstanceBufferSize() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + Configuration configuration = new Configuration(); + boolean notDefault = + !RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE + .defaultValue(); + configuration.set( + RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, + notDefault); + rocksDBStateBackend = + rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); + assertEquals( + notDefault, rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale()); + } + private void verifySetParameter(Runnable setter) { try { setter.run(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index 3177f373f6a42..dd5026da05297 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -20,7 +20,6 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -40,14 +39,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Random; -import java.util.Set; import java.util.UUID; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -75,8 +71,7 @@ public void testMultiThreadRestoreThreadPoolExceptionRethrow() { stateHandles, stateHandle); - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(5, (key, value) -> {})) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { rocksDBStateDownloader.transferAllStateDataToDirectory( Collections.singletonList( new StateHandleDownloadSpec( @@ -102,16 +97,11 @@ public void testMultiThreadRestoreCorrectly() throws Exception { temporaryFolder.newFolder().toPath(), contents[i], i)); } - Set customMetrics = new HashSet<>(); - - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(4, (key, value) -> customMetrics.add(key))) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(4)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, new CloseableRegistry()); } - assertThat(customMetrics).containsExactly(MetricNames.DOWNLOAD_STATE_DURATION); - for (int i = 0; i < numRemoteHandles; ++i) { StateHandleDownloadSpec downloadRequest = downloadRequests.get(i); Path dstPath = downloadRequest.getDownloadDestination(); @@ -148,8 +138,7 @@ public void testMultiThreadCleanupOnFailure() throws Exception { "error-handle")); CloseableRegistry closeableRegistry = new CloseableRegistry(); - try (RocksDBStateDownloader rocksDBStateDownloader = - new RocksDBStateDownloader(5, (key, value) -> {})) { + try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) { rocksDBStateDownloader.transferAllStateDataToDirectory( downloadRequests, closeableRegistry); fail("Exception is expected"); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index 2fc862664ceb3..43111df7582f4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; @@ -37,8 +38,11 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.RocksDB; +import javax.annotation.Nonnull; + import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Collections; /** Test utils for the RocksDB state backend. */ @@ -50,13 +54,34 @@ public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( return builderForTestDefaults( instanceBasePath, keySerializer, - EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP); + 2, + new KeyGroupRange(0, 1), + Collections.emptyList()); } public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( File instanceBasePath, TypeSerializer keySerializer, - EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType) { + int numKeyGroups, + KeyGroupRange keyGroupRange, + @Nonnull Collection stateHandles) { + + return builderForTestDefaults( + instanceBasePath, + keySerializer, + EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP, + numKeyGroups, + keyGroupRange, + stateHandles); + } + + public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( + File instanceBasePath, + TypeSerializer keySerializer, + EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType, + int numKeyGroups, + KeyGroupRange keyGroupRange, + @Nonnull Collection stateHandles) { final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); @@ -68,8 +93,8 @@ public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( stateName -> optionsContainer.getColumnOptions(), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), keySerializer, - 2, - new KeyGroupRange(0, 1), + numKeyGroups, + keyGroupRange, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType), @@ -77,7 +102,7 @@ public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), (key, value) -> {}, - Collections.emptyList(), + stateHandles, UncompressedStreamCompressionDecorator.INSTANCE, new CloseableRegistry()); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java index bdc85ef2e5d5d..38b8d3e8c92d2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.state.KeyGroupRange; @@ -41,17 +42,29 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.List; /** Tests to guard rescaling from checkpoint. */ +@RunWith(Parameterized.class) public class RocksIncrementalCheckpointRescalingTest extends TestLogger { @Rule public TemporaryFolder rootFolder = new TemporaryFolder(); + @Parameterized.Parameters(name = "useIngestDbRestoreMode: {0}") + public static Collection parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter public boolean useIngestDbRestoreMode; + private final int maxParallelism = 10; - private KeySelector keySelector = new TestKeySelector(); + private final KeySelector keySelector = new TestKeySelector(); private String[] records; @@ -419,7 +432,12 @@ private KeyedOneInputStreamOperatorTestHarness getHarne } private StateBackend getStateBackend() throws Exception { - return new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); + RocksDBStateBackend rocksDBStateBackend = + new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); + Configuration configuration = new Configuration(); + configuration.setBoolean( + RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); + return rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); } /** A simple keyed function for tests. */ diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 9b95c8aa41cca..ba13faca2940b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -109,23 +110,29 @@ public class AutoRescalingITCase extends TestLogger { private static final int slotsPerTaskManager = 2; private static final int totalSlots = numTaskManagers * slotsPerTaskManager; - @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}") + @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}, useIngestDB = {2}") public static Collection data() { return Arrays.asList( new Object[][] { - {"rocksdb", 0}, {"rocksdb", 2}, {"filesystem", 0}, {"filesystem", 2} + {"rocksdb", 0, false}, + {"rocksdb", 2, true}, + {"filesystem", 0, false}, + {"filesystem", 2, false} }); } - public AutoRescalingITCase(String backend, int buffersPerChannel) { + public AutoRescalingITCase(String backend, int buffersPerChannel, boolean useIngestDB) { this.backend = backend; this.buffersPerChannel = buffersPerChannel; + this.useIngestDB = useIngestDB; } private final String backend; private final int buffersPerChannel; + private final boolean useIngestDB; + private String currentBackend = null; enum OperatorCheckpointMethod { @@ -154,6 +161,7 @@ public void setup() throws Exception { final File savepointDir = temporaryFolder.newFolder(); config.set(StateBackendOptions.STATE_BACKEND, currentBackend); + config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDB); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); config.set(CheckpointingOptions.LOCAL_RECOVERY, true); config.set(