Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) #24031

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/content.zh/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</thead>
<tbody>
<tr>
<th rowspan="16">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="18">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="6"><strong>Checkpoint</strong></th>
<td>startTs</td>
<td>Timestamp when the checkpoint has started.</td>
Expand All @@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
<td>What was the state of this checkpoint: FAILED or COMPLETED.</td>
</tr>
<tr>
<th rowspan="10"><strong>JobInitialization</strong></th>
<th rowspan="12"><strong>JobInitialization</strong></th>
<td>startTs</td>
<td>Timestamp when the job initialization has started.</td>
</tr>
Expand Down Expand Up @@ -157,7 +157,11 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</tr>
<tr>
<td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.</td>
<td>The aggregated (max and sum) duration across all subtasks of downloading state files from the DFS.</td>
</tr>
<tr>
<td>(Max/Sum)RestoreStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) duration across all subtasks of restoring the state backend from fully localized state, i.e. after all remote state was downloaded.</td>
</tr>
<tr>
<td>(Max/Sum)RestoredStateSizeBytes.[location]</td>
Expand All @@ -167,6 +171,10 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
REMOTE,
UNKNOWN.</td>
</tr>
<tr>
<td>(Max/Sum)RestoreAsyncCompactionDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) duration across all subtasks for async compaction after incremental restore.</td>
</tr>
</tbody>
</table>

Expand Down
14 changes: 11 additions & 3 deletions docs/content/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</thead>
<tbody>
<tr>
<th rowspan="16">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="18">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="6"><strong>Checkpoint</strong></th>
<td>startTs</td>
<td>Timestamp when the checkpoint has started.</td>
Expand All @@ -123,7 +123,7 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
<td>What was the state of this checkpoint: FAILED or COMPLETED.</td>
</tr>
<tr>
<th rowspan="10"><strong>JobInitialization</strong></th>
<th rowspan="12"><strong>JobInitialization</strong></th>
<td>startTs</td>
<td>Timestamp when the job initialization has started.</td>
</tr>
Expand Down Expand Up @@ -157,7 +157,11 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</tr>
<tr>
<td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.</td>
<td>The aggregated (max and sum) duration across all subtasks of downloading state files from the DFS.</td>
</tr>
<tr>
<td>(Max/Sum)RestoreStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) duration across all subtasks of restoring the state backend from fully localized state, i.e. after all remote state was downloaded.</td>
</tr>
<tr>
<td>(Max/Sum)RestoredStateSizeBytes.[location]</td>
Expand All @@ -167,6 +171,10 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
REMOTE,
UNKNOWN.</td>
</tr>
<tr>
<td>(Max/Sum)RestoreAsyncCompactionDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td>
<td>The aggregated (max and sum) duration across all subtasks for async compaction after incremental restore.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Integer</td>
<td>The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.incremental-restore-async-compact-after-rescale</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.log.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>Boolean</td>
<td>If true, every newly created SST file will contain a Bloom filter. It is disabled by default.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.use-ingest-db-restore-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys in the SST files does not exceed the declared key-group range.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.write-batch-size</h5></td>
<td style="word-wrap: break-word;">2 mb</td>
Expand Down
42 changes: 42 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ private CollectionUtil() {
throw new AssertionError();
}

/** Returns true if the given collection is null or empty. */
public static boolean isNullOrEmpty(Collection<?> collection) {
return collection == null || collection.isEmpty();
}

/** Returns true if the given collection is empty or contains only null elements. */
public static boolean isEmptyOrAllElementsNull(Collection<?> collection) {
for (Object o : collection) {
if (o != null) {
return false;
}
}
return true;
}

public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}
Expand Down Expand Up @@ -214,4 +225,35 @@ static int computeRequiredCapacity(int expectedSize, float loadFactor) {
? (int) Math.ceil(expectedSize / loadFactor)
: Integer.MAX_VALUE;
}

/**
* Casts the given collection to a subtype. This is an unchecked cast that can lead to runtime
* exceptions.
*
* @param collection the collection to cast.
* @return the collection unchecked-cast to a subtype.
* @param <T> the subtype to cast to.
*/
public static <T> Collection<T> subTypeCast(Collection<? super T> collection) {
@SuppressWarnings("unchecked")
Collection<T> result = (Collection<T>) collection;
return result;
}

/**
* Casts the given collection to a subtype. This is a checked cast.
*
* @param collection the collection to cast.
* @param subTypeClass the class of the subtype to cast to.
* @return the collection checked and cast to a subtype.
* @param <T> the subtype to cast to.
*/
public static <T> Collection<T> checkedSubTypeCast(
Collection<? super T> collection, Class<T> subTypeClass) {
for (Object o : collection) {
// probe each object, will throw ClassCastException on mismatch.
subTypeClass.cast(o);
}
return subTypeCast(collection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.util.CollectionUtil.HASH_MAP_DEFAULT_LOAD_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for java collection utilities. */
@ExtendWith(TestLoggerExtension.class)
Expand Down Expand Up @@ -107,4 +111,47 @@ public void testComputeCapacity() {
} catch (IllegalArgumentException expected) {
}
}

@Test
public void testIsEmptyOrAllElementsNull() {
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Collections.emptyList()));
Assertions.assertTrue(
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList(null)));
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, null)));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList("test")));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test")));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList("test", null)));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test", null)));
}

@Test
public void testCheckedSubTypeCast() {
List<A> list = new ArrayList<>();
B b = new B();
C c = new C();
list.add(b);
list.add(c);
list.add(null);
Collection<B> castSuccess = CollectionUtil.checkedSubTypeCast(list, B.class);
Iterator<B> iterator = castSuccess.iterator();
Assertions.assertEquals(b, iterator.next());
Assertions.assertEquals(c, iterator.next());
Assertions.assertNull(iterator.next());
Assertions.assertFalse(iterator.hasNext());
try {
Collection<C> castFail = CollectionUtil.checkedSubTypeCast(list, C.class);
fail("Expected ClassCastException");
} catch (ClassCastException expected) {
}
}

static class A {}

static class B extends A {}

static class C extends B {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ private MetricNames() {}
public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs";
public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs";
public static final String RESTORE_STATE_DURATION = "RestoreStateDurationMs";
public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes";
public static final String RESTORE_ASYNC_COMPACTION_DURATION =
"RestoreAsyncCompactionDurationMs";

public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public String toString() {
+ delegate
+ ", offsets="
+ offsets
+ ", size="
+ size
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/** An abstract base implementation of the {@link StateBackendBuilder} interface. */
public abstract class AbstractKeyedStateBackendBuilder<K>
implements StateBackendBuilder<AbstractKeyedStateBackend, BackendBuildingException> {
implements StateBackendBuilder<AbstractKeyedStateBackend<K>, BackendBuildingException> {
protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final TaskKvStateRegistry kvStateRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public int hashCode() {

@Override
public String toString() {
return "DirectoryStateHandle{" + "directory=" + directoryString + '}';
return "DirectoryStateHandle{"
+ "directory='"
+ directoryString
+ '\''
+ ", directorySize="
+ directorySize
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(handle, localPath);
}

@Override
public String toString() {
return "HandleAndLocalPath{"
+ "handle="
+ handle
+ ", localPath='"
+ localPath
+ '\''
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public String toString() {
+ '}';
}

public String prettyPrintInterval() {
return "[" + startKeyGroup + ", " + endKeyGroup + "]";
}

@Override
public Iterator<Integer> iterator() {
return new KeyGroupIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
Expand Down Expand Up @@ -217,13 +218,19 @@ protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
Collections.emptyList(),
new CloseableRegistry()));
new CloseableRegistry(),
1.0d));

return backend;
}

protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
return (name, value) -> {};
}

protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
return restoreKeyedBackend(keySerializer, state, env);
Expand Down Expand Up @@ -255,9 +262,15 @@ protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
state,
new CloseableRegistry()));
new CloseableRegistry(),
1.0d));
}

protected MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}

@TestTemplate
Expand All @@ -283,9 +296,11 @@ void testEnableStateLatencyTracking() throws Exception {
groupRange,
kvStateRegistry,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
Collections.emptyList(),
cancelStreamRegistry));
cancelStreamRegistry,
1.0d));
try {
KeyedStateBackend<Integer> nested =
keyedStateBackend instanceof TestableKeyedStateBackend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,9 @@ public void testMaterializedRestorePriorityQueue() throws Exception {
ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(
getStateBackend(), env, streamFactory);
}

@Override
protected boolean checkMetrics() {
return false;
}
}
Loading