Skip to content

Commit

Permalink
[FLINK-35780][state] Support state migration from disabling to enabli…
Browse files Browse the repository at this point in the history
…ng ttl in RocksDBState
  • Loading branch information
xiangyuf committed Sep 15, 2024
1 parent 25969c9 commit b8da423
Show file tree
Hide file tree
Showing 19 changed files with 555 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -294,7 +295,7 @@ private static class LazilyRegisteredStateSerializerProvider<T>

@Nonnull
@Override
@SuppressWarnings("ConstantConditions")
@SuppressWarnings({"ConstantConditions", "unchecked"})
public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(
TypeSerializer<T> newSerializer) {
checkNotNull(newSerializer);
Expand All @@ -303,8 +304,26 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
"A serializer has already been registered for the state; re-registration is not allowed.");
}

TypeSerializer<T> originalSerializer = newSerializer;
if (previousSerializerSnapshot != null) {
TypeSerializer<T> previousSerializer = previousSchemaSerializer();
if (TtlStateFactory.TtlSerializer.isMigrateFromDisablingToEnabling(
previousSerializer, newSerializer)) {
originalSerializer =
(TypeSerializer<T>)
TtlStateFactory.TtlSerializer.extractOriginalTypeSerializer(
newSerializer);
} else if (TtlStateFactory.TtlSerializer.isMigrateFromEnablingToDisabling(
previousSerializer, newSerializer)) {
previousSerializer =
(TypeSerializer<T>)
TtlStateFactory.TtlSerializer.extractOriginalTypeSerializer(
previousSerializer);
previousSerializerSnapshot = previousSerializer.snapshotConfiguration();
}
}
TypeSerializerSchemaCompatibility<T> result =
newSerializer
originalSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
if (result.isIncompatible()) {
Expand All @@ -325,6 +344,12 @@ public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRest
throw new UnsupportedOperationException(
"The snapshot of the state's previous serializer has already been set; cannot reset.");
}

private boolean isTtlMigration(TypeSerializer<T> newSerializer) {
return previousSerializerSnapshot != null
&& TtlStateFactory.TtlSerializer.isTtlStateMigration(
previousSchemaSerializer(), newSerializer);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<V> previousStateSerializer = restoredKvMetaInfo.getStateSerializer();

TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
newStateSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(
previousStateSerializer.snapshotConfiguration());

if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException(
Expand All @@ -252,6 +255,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ previousStateSerializer
+ ").");
}
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);

restoredKvMetaInfo =
allowFutureMetadataUpdates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
}

@SuppressWarnings("unchecked")
TypeSerializer<Long> getTimestampSerializer() {
public TypeSerializer<Long> getTimestampSerializer() {
return (TypeSerializer<Long>) (TypeSerializer<?>) fieldSerializers[0];
}

@SuppressWarnings("unchecked")
TypeSerializer<T> getValueSerializer() {
public TypeSerializer<T> getValueSerializer() {
return (TypeSerializer<T>) fieldSerializers[1];
}

Expand All @@ -329,16 +329,63 @@ public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
}

public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
boolean ttlSerializer = typeSerializer instanceof TtlStateFactory.TtlSerializer;
boolean ttlListSerializer =
typeSerializer instanceof ListSerializer
&& ((ListSerializer) typeSerializer).getElementSerializer()
instanceof TtlStateFactory.TtlSerializer;
boolean ttlMapSerializer =
typeSerializer instanceof MapSerializer
&& ((MapSerializer) typeSerializer).getValueSerializer()
instanceof TtlStateFactory.TtlSerializer;
return ttlSerializer || ttlListSerializer || ttlMapSerializer;
return isTtlValueStateSerializer(typeSerializer)
|| isTtlListStateSerializer(typeSerializer)
|| isTtlMapStateSerializer(typeSerializer);
}

public static boolean isTtlValueStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof TtlStateFactory.TtlSerializer;
}

@SuppressWarnings("rawtypes")
public static boolean isTtlListStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof ListSerializer
&& ((ListSerializer) typeSerializer).getElementSerializer()
instanceof TtlStateFactory.TtlSerializer;
}

@SuppressWarnings("rawtypes")
public static boolean isTtlMapStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof MapSerializer
&& ((MapSerializer) typeSerializer).getValueSerializer()
instanceof TtlStateFactory.TtlSerializer;
}

public static boolean isTtlStateMigration(
TypeSerializer<?> previousSerializer, TypeSerializer<?> newSerializer) {
return isMigrateFromDisablingToEnabling(previousSerializer, newSerializer)
|| isMigrateFromEnablingToDisabling(previousSerializer, newSerializer);
}

public static boolean isMigrateFromDisablingToEnabling(
TypeSerializer<?> previousSerializer, TypeSerializer<?> newSerializer) {
return !isTtlStateSerializer(previousSerializer) && isTtlStateSerializer(newSerializer);
}

public static boolean isMigrateFromEnablingToDisabling(
TypeSerializer<?> previousSerializer, TypeSerializer<?> newSerializer) {
return isTtlStateSerializer(previousSerializer) && !isTtlStateSerializer(newSerializer);
}

public static TypeSerializer<?> extractOriginalTypeSerializer(
TypeSerializer<?> newSerializer) {
if (isTtlValueStateSerializer(newSerializer)) {
return ((TtlStateFactory.TtlSerializer<?>) newSerializer).getValueSerializer();
} else if (isTtlListStateSerializer(newSerializer)) {
TtlStateFactory.TtlSerializer<?> elementSerializer =
(TtlStateFactory.TtlSerializer<?>)
((ListSerializer<?>) newSerializer).getElementSerializer();
return new ListSerializer<>(elementSerializer.getValueSerializer());
} else if (isTtlMapStateSerializer(newSerializer)) {
MapSerializer<?, ?> mapSerializer = ((MapSerializer<?, ?>) newSerializer);
TypeSerializer<?> keySerializer = mapSerializer.getKeySerializer();
TtlStateFactory.TtlSerializer<?> valueSerializer =
(TtlStateFactory.TtlSerializer<?>) mapSerializer.getValueSerializer();
return new MapSerializer<>(keySerializer, valueSerializer.getValueSerializer());
}

return newSerializer;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* @param <N> The type of the namespace
* @param <T> Type of the user value of state with TTL
*/
class TtlValueState<K, N, T>
public class TtlValueState<K, N, T>
extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
implements InternalValueState<K, N, T> {
TtlValueState(TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> tTtlStateContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

private void testKeyedValueStateUpgrade(
protected void testKeyedValueStateUpgrade(
ValueStateDescriptor<TestType> initialAccessDescriptor,
ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -278,7 +278,7 @@ void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

private void testKeyedListStateUpgrade(
protected void testKeyedListStateUpgrade(
ListStateDescriptor<TestType> initialAccessDescriptor,
ListStateDescriptor<TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -438,7 +438,7 @@ private Iterator<Map.Entry<Integer, TestType>> sortedIterator(
return set.iterator();
}

private void testKeyedMapStateUpgrade(
protected void testKeyedMapStateUpgrade(
MapStateDescriptor<Integer, TestType> initialAccessDescriptor,
MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore)
throws Exception {
Expand Down Expand Up @@ -1209,7 +1209,7 @@ void testStateMigrationAfterChangingTTL() throws Exception {
}

@TestTemplate
void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
final String stateName = "test-ttl";

ValueStateDescriptor<TestType> initialAccessDescriptor =
Expand All @@ -1229,12 +1229,11 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
}

@TestTemplate
void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
final String stateName = "test-ttl";

ValueStateDescriptor<TestType> initialAccessDescriptor =
new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());

ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer());
newAccessDescriptorAfterRestore.enableTimeToLive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private CheckpointStreamFactory createCheckpointStreamFactory() {
}
}

void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ private void disposeKeyedStateBackend() {
}
}

KeyedStateHandle takeSnapshot() throws Exception {
public KeyedStateHandle takeSnapshot() throws Exception {
SnapshotResult<KeyedStateHandle> snapshotResult = triggerSnapshot().get();
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
Expand All @@ -177,7 +177,7 @@ public void setCurrentKey(String key) {
}

@SuppressWarnings("unchecked")
<N, S extends State, V> S createState(
public <N, S extends State, V> S createState(
StateDescriptor<S, V> stateDescriptor,
@SuppressWarnings("SameParameterValue") N defaultNamespace)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean isSavepoint() {
return (TtlMergingStateTestContext<?, UV, ?>) ctx;
}

private void initTest() throws Exception {
protected void initTest() throws Exception {
initTest(
StateTtlConfig.UpdateType.OnCreateAndWrite,
StateTtlConfig.StateVisibility.NeverReturnExpired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,18 @@ protected boolean supportsKeySerializerCheck() {
// TODO support checking key serializer
return false;
}

@Override
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
super.testStateMigrationAfterChangingTTLFromDisablingToEnabling();
}
}

@Override
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
super.testStateMigrationAfterChangingTTLFromEnablingToDisabling();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
Expand Down Expand Up @@ -195,6 +198,34 @@ public void migrateSerializedValue(
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void migrateSerializedTtlValue(
DataInputDeserializer serializedOldValueInput,
DataOutputSerializer serializedMigratedValueOutput,
TypeSerializer<V> priorSerializer,
TypeSerializer<V> newSerializer,
TtlTimeProvider ttlTimeProvider)
throws StateMigrationException {
try {
if (TtlStateFactory.TtlSerializer.isMigrateFromDisablingToEnabling(
priorSerializer, newSerializer)) {
V value = priorSerializer.deserialize(serializedOldValueInput);
TtlStateFactory.TtlSerializer<V> ttlSerializer =
(TtlStateFactory.TtlSerializer<V>) newSerializer;
TtlValue<V> ttlValue =
ttlSerializer.createInstance(ttlTimeProvider.currentTimestamp(), value);
ttlSerializer.serialize(ttlValue, serializedMigratedValueOutput);
} else {
TtlStateFactory.TtlSerializer<V> ttlSerializer =
(TtlStateFactory.TtlSerializer<V>) priorSerializer;
TtlValue<V> ttlValue = ttlSerializer.deserialize(serializedOldValueInput);
newSerializer.serialize(ttlValue.getUserValue(), serializedMigratedValueOutput);
}
} catch (Exception e) {
throw new StateMigrationException("Error while trying to migrate RocksDB state.", e);
}
}

byte[] getKeyBytes() {
return serializeCurrentKeyWithGroupAndNamespace();
}
Expand Down Expand Up @@ -233,6 +264,11 @@ protected AbstractRocksDBState<K, N, V> setDefaultValue(V defaultValue) {
return this;
}

protected AbstractRocksDBState<K, N, V> setColumnFamily(ColumnFamilyHandle columnFamily) {
this.columnFamily = columnFamily;
return this;
}

@Override
public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(
int recommendedMaxNumberOfReturnedRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static <K, N, SV, S extends State, IS extends S> IS update(
((AggregatingStateDescriptor) stateDesc).getAggregateFunction())
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
.setValueSerializer(registerResult.f1.getStateSerializer())
.setDefaultValue(stateDesc.getDefaultValue());
.setDefaultValue(stateDesc.getDefaultValue())
.setColumnFamily(registerResult.f0);
}
}
Loading

0 comments on commit b8da423

Please sign in to comment.