Skip to content

Commit

Permalink
Add micro benchmark test
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Jan 25, 2024
1 parent 366a1a5 commit d356083
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.IOUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.core.fs.Path.fromLocalFile;
import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;

/** Rescaling microbenchmark for clip/ingest DB. */
public class RocksDBRecoveryTest {

@TempDir private static java.nio.file.Path tempFolder;

@Test
public void testScaleOut_1_2() throws Exception {
testRescale(1, 2, 100_000_000, 10);
}

@Test
public void testScaleOut_2_8() throws Exception {
testRescale(2, 8, 100_000_000, 10);
}

@Test
public void testScaleIn_2_1() throws Exception {
testRescale(2, 1, 100_000_000, 10);
}

@Test
public void testScaleIn_8_2() throws Exception {
testRescale(8, 2, 100_000_000, 10);
}

public void testRescale(
int startParallelism, int restoreParallelism, int numKeys, int updateDistance)
throws Exception {

System.out.println(
"Rescaling from " + startParallelism + " to " + restoreParallelism + "...");
final int maxParallelism = Math.max(startParallelism, restoreParallelism);
final List<RocksDBKeyedStateBackend<Integer>> backends = new ArrayList<>(maxParallelism);
final List<SnapshotResult<KeyedStateHandle>> snapshotResults =
new ArrayList<>(startParallelism);
try {
final List<ValueState<Integer>> valueStates = new ArrayList<>(maxParallelism);
try {
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("TestValueState", IntSerializer.INSTANCE);

for (int i = 0; i < startParallelism; ++i) {
RocksDBKeyedStateBackend<Integer> backend =
RocksDBTestUtils.builderForTestDefaults(
TempDirUtils.newFolder(tempFolder),
IntSerializer.INSTANCE,
maxParallelism,
KeyGroupRangeAssignment
.computeKeyGroupRangeForOperatorIndex(
maxParallelism, startParallelism, i),
Collections.emptyList())
.setEnableIncrementalCheckpointing(true)
.setUseIngestDbRestoreMode(true)
.build();

valueStates.add(
backend.getOrCreateKeyedState(
VoidNamespaceSerializer.INSTANCE, stateDescriptor));

backends.add(backend);
}

System.out.println("Inserting " + numKeys + " keys...");

for (int i = 0; i < numKeys; ++i) {
int key = i;
int index =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, startParallelism);
backends.get(index).setCurrentKey(key);
valueStates.get(index).update(i);

if (updateDistance > 0 && i % updateDistance == (updateDistance - 1)) {
key = i - updateDistance - 1;
index =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, startParallelism);
backends.get(index).setCurrentKey(key);
valueStates.get(index).update(i);
}
}

System.out.println("Creating snapshots...");

for (int i = 0; i < backends.size(); ++i) {
RocksDBKeyedStateBackend<Integer> backend = backends.get(i);
FsCheckpointStreamFactory fsCheckpointStreamFactory =
new FsCheckpointStreamFactory(
getSharedInstance(),
fromLocalFile(
TempDirUtils.newFolder(
tempFolder, "checkpointsDir_" + i)),
fromLocalFile(
TempDirUtils.newFolder(
tempFolder, "sharedStateDir_" + i)),
1,
4096);

RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
backend.snapshot(
0L,
0L,
fsCheckpointStreamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation());

snapshot.run();
snapshotResults.add(snapshot.get());
}

} finally {
for (RocksDBKeyedStateBackend<Integer> backend : backends) {
IOUtils.closeQuietly(backend);
backend.dispose();
}
valueStates.clear();
backends.clear();
}

List<KeyedStateHandle> stateHandles =
snapshotResults.stream()
.map(SnapshotResult::getJobManagerOwnedSnapshot)
.collect(Collectors.toList());

System.out.println(
"Sum of snapshot sizes: "
+ stateHandles.stream().mapToLong(StateObject::getStateSize).sum()
/ (1024 * 1024)
+ " MB");

List<KeyGroupRange> ranges = new ArrayList<>(restoreParallelism);
for (int i = 0; i < restoreParallelism; ++i) {
ranges.add(
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
maxParallelism, restoreParallelism, i));
}

List<List<KeyedStateHandle>> handlesByInstance = new ArrayList<>(restoreParallelism);
for (KeyGroupRange targetRange : ranges) {
List<KeyedStateHandle> handlesForTargetRange = new ArrayList<>(1);
handlesByInstance.add(handlesForTargetRange);

for (KeyedStateHandle stateHandle : stateHandles) {
if (stateHandle.getKeyGroupRange().getIntersection(targetRange)
!= KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
handlesForTargetRange.add(stateHandle);
}
}
}

for (boolean useIngest : Arrays.asList(Boolean.FALSE, Boolean.TRUE)) {
System.out.println("Restoring using ingest db=" + useIngest + "... ");
long t = System.currentTimeMillis();
for (int i = 0; i < restoreParallelism; ++i) {
List<KeyedStateHandle> instanceHandles = handlesByInstance.get(i);
long tInstance = System.currentTimeMillis();
RocksDBKeyedStateBackend<Integer> backend =
RocksDBTestUtils.builderForTestDefaults(
TempDirUtils.newFolder(tempFolder),
IntSerializer.INSTANCE,
maxParallelism,
ranges.get(i),
instanceHandles)
.setEnableIncrementalCheckpointing(true)
.setUseIngestDbRestoreMode(useIngest)
.build();
System.out.println(
" Restored instance "
+ i
+ " from "
+ instanceHandles.size()
+ " state handles"
+ " time (ms): "
+ (System.currentTimeMillis() - tInstance));
backends.add(backend);
}
System.out.println("Total restore time (ms): " + (System.currentTimeMillis() - t));
}

} finally {
for (RocksDBKeyedStateBackend<Integer> backend : backends) {
IOUtils.closeQuietly(backend);
backend.dispose();
}
backends.clear();
for (SnapshotResult<KeyedStateHandle> snapshotResult : snapshotResults) {
snapshotResult.discardState();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
Expand All @@ -37,8 +38,11 @@
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDB;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

/** Test utils for the RocksDB state backend. */
Expand All @@ -50,13 +54,34 @@ public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
return builderForTestDefaults(
instanceBasePath,
keySerializer,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
2,
new KeyGroupRange(0, 1),
Collections.emptyList());
}

public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
File instanceBasePath,
TypeSerializer<K> keySerializer,
EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType) {
int numKeyGroups,
KeyGroupRange keyGroupRange,
@Nonnull Collection<KeyedStateHandle> stateHandles) {

return builderForTestDefaults(
instanceBasePath,
keySerializer,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP,
numKeyGroups,
keyGroupRange,
stateHandles);
}

public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
File instanceBasePath,
TypeSerializer<K> keySerializer,
EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType,
int numKeyGroups,
KeyGroupRange keyGroupRange,
@Nonnull Collection<KeyedStateHandle> stateHandles) {

final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();

Expand All @@ -68,16 +93,16 @@ public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
stateName -> optionsContainer.getColumnOptions(),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
keySerializer,
2,
new KeyGroupRange(0, 1),
numKeyGroups,
keyGroupRange,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType),
TtlTimeProvider.DEFAULT,
LatencyTrackingStateConfig.disabled(),
new UnregisteredMetricsGroup(),
(key, value) -> {},
Collections.emptyList(),
stateHandles,
UncompressedStreamCompressionDecorator.INSTANCE,
new CloseableRegistry());
}
Expand Down

0 comments on commit d356083

Please sign in to comment.