Skip to content

Commit

Permalink
Support state migration from disabling to enabling ttl for RocksDBState
Browse files Browse the repository at this point in the history
[FLINK-35780][state] Support state migration from disabling to enabling ttl in RocksDBState
  • Loading branch information
xiangyuf committed Jul 7, 2024
1 parent f41927c commit 55b44e1
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -294,7 +297,7 @@ private static class LazilyRegisteredStateSerializerProvider<T>

@Nonnull
@Override
@SuppressWarnings("ConstantConditions")
@SuppressWarnings({"ConstantConditions", "unchecked"})
public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(
TypeSerializer<T> newSerializer) {
checkNotNull(newSerializer);
Expand All @@ -303,8 +306,13 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
"A serializer has already been registered for the state; re-registration is not allowed.");
}

TypeSerializer<T> originalSerializer = newSerializer;
if (isTtlMigration(newSerializer)) {
originalSerializer = getTtlValueSerializer(newSerializer);
}

TypeSerializerSchemaCompatibility<T> result =
newSerializer
originalSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
if (result.isIncompatible()) {
Expand All @@ -325,6 +333,36 @@ public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRest
throw new UnsupportedOperationException(
"The snapshot of the state's previous serializer has already been set; cannot reset.");
}

@SuppressWarnings({"unchecked"})
private TypeSerializer<T> getTtlValueSerializer(TypeSerializer<T> newSerializer) {
if (TtlStateFactory.TtlSerializer.isTtlValueStateSerializer(newSerializer)) {
return ((TtlStateFactory.TtlSerializer<T>) newSerializer).getValueSerializer();
} else if (TtlStateFactory.TtlSerializer.isTtlListStateSerializer(newSerializer)) {
TtlStateFactory.TtlSerializer<T> elementSerializer =
(TtlStateFactory.TtlSerializer<T>)
((ListSerializer<T>) newSerializer).getElementSerializer();

return (TypeSerializer<T>)
new ListSerializer<>(elementSerializer.getValueSerializer());
} else if (TtlStateFactory.TtlSerializer.isTtlMapStateSerializer(newSerializer)) {
MapSerializer<?, ?> mapSerializer = ((MapSerializer<?, ?>) newSerializer);
TypeSerializer<?> keySerializer = mapSerializer.getKeySerializer();
TtlStateFactory.TtlSerializer<?> valueSerializer =
(TtlStateFactory.TtlSerializer<?>) mapSerializer.getValueSerializer();
return (TypeSerializer<T>)
new MapSerializer<>(keySerializer, valueSerializer.getValueSerializer());
}

return newSerializer;
}

private boolean isTtlMigration(TypeSerializer<T> newSerializer) {
return previousSerializerSnapshot != null
&& !TtlStateFactory.TtlSerializer.isTtlStateSerializer(
previousSchemaSerializer())
&& TtlStateFactory.TtlSerializer.isTtlStateSerializer(newSerializer);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
}

@SuppressWarnings("unchecked")
TypeSerializer<Long> getTimestampSerializer() {
public TypeSerializer<Long> getTimestampSerializer() {
return (TypeSerializer<Long>) (TypeSerializer<?>) fieldSerializers[0];
}

@SuppressWarnings("unchecked")
TypeSerializer<T> getValueSerializer() {
public TypeSerializer<T> getValueSerializer() {
return (TypeSerializer<T>) fieldSerializers[1];
}

Expand All @@ -329,16 +329,25 @@ public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
}

public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
boolean ttlSerializer = typeSerializer instanceof TtlStateFactory.TtlSerializer;
boolean ttlListSerializer =
typeSerializer instanceof ListSerializer
&& ((ListSerializer) typeSerializer).getElementSerializer()
instanceof TtlStateFactory.TtlSerializer;
boolean ttlMapSerializer =
typeSerializer instanceof MapSerializer
&& ((MapSerializer) typeSerializer).getValueSerializer()
instanceof TtlStateFactory.TtlSerializer;
return ttlSerializer || ttlListSerializer || ttlMapSerializer;
return isTtlValueStateSerializer(typeSerializer)
|| isTtlListStateSerializer(typeSerializer)
|| isTtlMapStateSerializer(typeSerializer);
}

public static boolean isTtlValueStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof TtlStateFactory.TtlSerializer;
}

public static boolean isTtlListStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof ListSerializer
&& ((ListSerializer) typeSerializer).getElementSerializer()
instanceof TtlStateFactory.TtlSerializer;
}

public static boolean isTtlMapStateSerializer(TypeSerializer<?> typeSerializer) {
return typeSerializer instanceof MapSerializer
&& ((MapSerializer) typeSerializer).getValueSerializer()
instanceof TtlStateFactory.TtlSerializer;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* @param <N> The type of the namespace
* @param <T> Type of the user value of state with TTL
*/
class TtlValueState<K, N, T>
public class TtlValueState<K, N, T>
extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
implements InternalValueState<K, N, T> {
TtlValueState(TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> tTtlStateContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,24 +1229,44 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
}

@TestTemplate
void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
final String stateName = "test-ttl";

ValueStateDescriptor<TestType> initialAccessDescriptor =
new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());

ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer());
new ValueStateDescriptor<>(
stateName,
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());

assertThatThrownBy(
() ->
testKeyedValueStateUpgrade(
initialAccessDescriptor, newAccessDescriptorAfterRestore))
.satisfiesAnyOf(
e -> assertThat(e).isInstanceOf(StateMigrationException.class),
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
ListStateDescriptor<TestType> initialAccessListDescriptor =
new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());
ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
new ListStateDescriptor<>(
stateName,
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessListDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());

MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
new MapStateDescriptor<>(
stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer());
MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore =
new MapStateDescriptor<>(
stateName,
IntSerializer.INSTANCE,
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessMapDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());

testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore);
testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore);
testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore);
}

// -------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
Expand All @@ -35,6 +39,8 @@
import org.rocksdb.WriteOptions;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Base class for {@link State} implementations that store state in a RocksDB database.
Expand Down Expand Up @@ -195,6 +201,47 @@ public void migrateSerializedValue(
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void migrateSerializedTtlValue(
DataInputDeserializer serializedOldValueInput,
DataOutputSerializer serializedMigratedValueOutput,
TypeSerializer<V> priorSerializer,
TypeSerializer<V> newSerializer,
TtlTimeProvider ttlTimeProvider)
throws StateMigrationException {
try {
V value = priorSerializer.deserialize(serializedOldValueInput);
if (TtlStateFactory.TtlSerializer.isTtlValueStateSerializer(newSerializer)) {
TtlStateFactory.TtlSerializer<V> ttlSerializer =
(TtlStateFactory.TtlSerializer<V>) newSerializer;
TtlValue<V> ttlValue =
ttlSerializer.createInstance(ttlTimeProvider.currentTimestamp(), value);
ttlSerializer.serialize(ttlValue, serializedMigratedValueOutput);
} else if (TtlStateFactory.TtlSerializer.isTtlMapStateSerializer(newSerializer)) {
MapSerializer ttlMapSerializer = (MapSerializer) newSerializer;
Map ttlValueMap = getTtlValueMap(ttlTimeProvider, ttlMapSerializer, (Map) value);
ttlMapSerializer.serialize(ttlValueMap, serializedMigratedValueOutput);
}
} catch (Exception e) {
throw new StateMigrationException("Error while trying to migrate RocksDB state.", e);
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static Map getTtlValueMap(
TtlTimeProvider ttlTimeProvider, MapSerializer ttlMapSerializer, Map valueMap) {
TtlStateFactory.TtlSerializer ttlValueSerializer =
(TtlStateFactory.TtlSerializer) ttlMapSerializer.getValueSerializer();
Map ttlValueMap = new HashMap<>();
valueMap.forEach(
(k, v) ->
ttlValueMap.put(
k,
ttlValueSerializer.createInstance(
ttlTimeProvider.currentTimestamp(), v)));
return ttlValueMap;
}

byte[] getKeyBytes() {
return serializeCurrentKeyWithGroupAndNamespace();
}
Expand Down
Loading

0 comments on commit 55b44e1

Please sign in to comment.