Skip to content

Commit

Permalink
[FLINK-34199] Add tracing for durations of rescaling/restoring (from …
Browse files Browse the repository at this point in the history
…local and downloaded remote state).
  • Loading branch information
StefanRRichter committed Feb 12, 2024
1 parent 8533c88 commit fc4c962
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 83 deletions.
42 changes: 42 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ private CollectionUtil() {
throw new AssertionError();
}

/** Returns true if the given collection is null or empty. */
public static boolean isNullOrEmpty(Collection<?> collection) {
return collection == null || collection.isEmpty();
}

/** Returns true if the given collection is empty or contains only null elements. */
public static boolean isEmptyOrAllElementsNull(Collection<?> collection) {
for (Object o : collection) {
if (o != null) {
return false;
}
}
return true;
}

public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}
Expand Down Expand Up @@ -214,4 +225,35 @@ static int computeRequiredCapacity(int expectedSize, float loadFactor) {
? (int) Math.ceil(expectedSize / loadFactor)
: Integer.MAX_VALUE;
}

/**
* Casts the given collection to a subtype. This is an unchecked cast that can lead to runtime
* exceptions.
*
* @param collection the collection to cast.
* @return the collection unchecked-cast to a subtype.
* @param <T> the subtype to cast to.
*/
public static <T> Collection<T> subTypeCast(Collection<? super T> collection) {
@SuppressWarnings("unchecked")
Collection<T> result = (Collection<T>) collection;
return result;
}

/**
* Casts the given collection to a subtype. This is a checked cast.
*
* @param collection the collection to cast.
* @param subTypeClass the class of the subtype to cast to.
* @return the collection checked and cast to a subtype.
* @param <T> the subtype to cast to.
*/
public static <T> Collection<T> checkedSubTypeCast(
Collection<? super T> collection, Class<T> subTypeClass) {
for (Object o : collection) {
// probe each object, will throw ClassCastException on mismatch.
subTypeClass.cast(o);
}
return subTypeCast(collection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.util.CollectionUtil.HASH_MAP_DEFAULT_LOAD_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for java collection utilities. */
@ExtendWith(TestLoggerExtension.class)
Expand Down Expand Up @@ -107,4 +111,47 @@ public void testComputeCapacity() {
} catch (IllegalArgumentException expected) {
}
}

@Test
public void testIsEmptyOrAllElementsNull() {
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Collections.emptyList()));
Assertions.assertTrue(
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList(null)));
Assertions.assertTrue(CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, null)));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Collections.singletonList("test")));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test")));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList("test", null)));
Assertions.assertFalse(
CollectionUtil.isEmptyOrAllElementsNull(Arrays.asList(null, "test", null)));
}

@Test
public void testCheckedSubTypeCast() {
List<A> list = new ArrayList<>();
B b = new B();
C c = new C();
list.add(b);
list.add(c);
list.add(null);
Collection<B> castSuccess = CollectionUtil.checkedSubTypeCast(list, B.class);
Iterator<B> iterator = castSuccess.iterator();
Assertions.assertEquals(b, iterator.next());
Assertions.assertEquals(c, iterator.next());
Assertions.assertNull(iterator.next());
Assertions.assertFalse(iterator.hasNext());
try {
Collection<C> castFail = CollectionUtil.checkedSubTypeCast(list, C.class);
fail("Expected ClassCastException");
} catch (ClassCastException expected) {
}
}

static class A {}

static class B extends A {}

static class C extends B {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ private MetricNames() {}
public static final String INITIALIZE_STATE_DURATION = "InitializeStateDurationMs";
public static final String GATE_RESTORE_DURATION = "GateRestoreDurationMs";
public static final String DOWNLOAD_STATE_DURATION = "DownloadStateDurationMs";
public static final String RESTORE_STATE_DURATION = "RestoreStateDurationMs";
public static final String RESTORED_STATE_SIZE = "RestoredStateSizeBytes";
public static final String RESTORE_ASYNC_COMPACTION_DURATION =
"RestoreAsyncCompactionDurationMs";

public static final String START_WORKER_FAILURE_RATE = "startWorkFailure" + SUFFIX_RATE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public String toString() {
+ delegate
+ ", offsets="
+ offsets
+ ", size="
+ size
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public int hashCode() {

@Override
public String toString() {
return "DirectoryStateHandle{" + "directory=" + directoryString + '}';
return "DirectoryStateHandle{"
+ "directory='"
+ directoryString
+ '\''
+ ", directorySize="
+ directorySize
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(handle, localPath);
}

@Override
public String toString() {
return "HandleAndLocalPath{"
+ "handle="
+ handle
+ ", localPath='"
+ localPath
+ '\''
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public String toString() {
+ '}';
}

public String prettyPrintInterval() {
return "[" + startKeyGroup + ", " + endKeyGroup + "]";
}

@Override
public Iterator<Integer> iterator() {
return new KeyGroupIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
Expand Down Expand Up @@ -217,13 +218,19 @@ protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
Collections.emptyList(),
new CloseableRegistry()));
new CloseableRegistry(),
1.0d));

return backend;
}

protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
return (name, value) -> {};
}

protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
return restoreKeyedBackend(keySerializer, state, env);
Expand Down Expand Up @@ -255,9 +262,15 @@ protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
state,
new CloseableRegistry()));
new CloseableRegistry(),
1.0d));
}

protected MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}

@TestTemplate
Expand All @@ -283,9 +296,11 @@ void testEnableStateLatencyTracking() throws Exception {
groupRange,
kvStateRegistry,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
getMetricGroup(),
getCustomInitializationMetrics(),
Collections.emptyList(),
cancelStreamRegistry));
cancelStreamRegistry,
1.0d));
try {
KeyedStateBackend<Integer> nested =
keyedStateBackend instanceof TestableKeyedStateBackend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,9 @@ public void testMaterializedRestorePriorityQueue() throws Exception {
ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(
getStateBackend(), env, streamFactory);
}

@Override
protected boolean checkMetrics() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -471,7 +472,7 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
DBOptions dbOptions = optionsContainer.getDbOptions();
if (restoreStateHandles.isEmpty()) {
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
return new RocksDBNoneRestoreOperation<>(
kvStateInformation,
instanceRocksDBPath,
Expand Down Expand Up @@ -500,7 +501,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
nativeMetricOptions,
metricGroup,
customInitializationMetrics,
restoreStateHandles,
CollectionUtil.checkedSubTypeCast(
restoreStateHandles, IncrementalKeyedStateHandle.class),
ttlCompactFiltersManager,
writeBatchSize,
optionsContainer.getWriteBufferManagerCapacity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

Expand All @@ -39,17 +37,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;

/** Help class for downloading RocksDB state files. */
public class RocksDBStateDownloader extends RocksDBStateDataTransfer {

private final CustomInitializationMetrics customInitializationMetrics;

public RocksDBStateDownloader(
int restoringThreadNum, CustomInitializationMetrics customInitializationMetrics) {
public RocksDBStateDownloader(int restoringThreadNum) {
super(restoringThreadNum);
this.customInitializationMetrics = customInitializationMetrics;
}

/**
Expand All @@ -68,15 +60,11 @@ public void transferAllStateDataToDirectory(
// Make sure we also react to external close signals.
closeableRegistry.registerCloseable(internalCloser);
try {
long startTimeMs = SystemClock.getInstance().relativeTimeMillis();
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser)
.collect(Collectors.toList());
// Wait until either all futures completed successfully or one failed exceptionally.
FutureUtils.completeAll(futures).get();
customInitializationMetrics.addMetric(
DOWNLOAD_STATE_DURATION,
SystemClock.getInstance().relativeTimeMillis() - startTimeMs);
} catch (Exception e) {
downloadRequests.stream()
.map(StateHandleDownloadSpec::getDownloadDestination)
Expand Down
Loading

0 comments on commit fc4c962

Please sign in to comment.