Skip to content

Commit

Permalink
[FLINK-35780][state] Support state migration from disabling to enabli…
Browse files Browse the repository at this point in the history
…ng ttl in RocksDBState
  • Loading branch information
xiangyuf committed Oct 6, 2024
1 parent 25969c9 commit e7e602c
Show file tree
Hide file tree
Showing 26 changed files with 1,003 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -54,6 +55,8 @@ enum Type {
*/
COMPATIBLE_AFTER_MIGRATION,

COMPATIBLE_AFTER_TTL_MIGRATION,

/**
* This indicates that a reconfigured version of the new serializer is compatible, and
* should be used instead of the original new serializer.
Expand Down Expand Up @@ -95,6 +98,23 @@ public static <T> TypeSerializerSchemaCompatibility<T> compatibleAfterMigration(
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
}

public static <T> TypeSerializerSchemaCompatibility<T> compatibleAfterTtlMigration() {
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_TTL_MIGRATION, null);
}

public static <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityForTtlMigration(
@Nonnull TypeSerializerSchemaCompatibility<T> originalCompatibility) {
switch (originalCompatibility.resultType) {
case COMPATIBLE_AS_IS:
case COMPATIBLE_AFTER_MIGRATION:
case COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:
return compatibleAfterTtlMigration();
case INCOMPATIBLE:
default:
return incompatible();
}
}

/**
* Returns a result that indicates a reconfigured version of the new serializer is compatible,
* and should be used instead of the original new serializer.
Expand Down Expand Up @@ -147,7 +167,12 @@ public boolean isCompatibleAsIs() {
* Type#COMPATIBLE_AFTER_MIGRATION}.
*/
public boolean isCompatibleAfterMigration() {
return resultType == Type.COMPATIBLE_AFTER_MIGRATION;
return resultType == Type.COMPATIBLE_AFTER_MIGRATION
|| resultType == Type.COMPATIBLE_AFTER_TTL_MIGRATION;
}

public boolean isCompatibleAfterTtlMigration() {
return resultType == Type.COMPATIBLE_AFTER_TTL_MIGRATION;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;

import java.util.List;

Expand Down Expand Up @@ -54,4 +55,9 @@ protected ListSerializer<T> createOuterSerializerWithNestedSerializers(
protected TypeSerializer<?>[] getNestedSerializers(ListSerializer<T> outerSerializer) {
return new TypeSerializer<?>[] {outerSerializer.getElementSerializer()};
}

@SuppressWarnings("unchecked")
public TypeSerializerSnapshot<T> getElementSerializerSnapshot() {
return (TypeSerializerSnapshot<T>) getNestedSerializerSnapshots()[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ protected TypeSerializer<?>[] getNestedSerializers(MapSerializer<K, V> outerSeri
public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
return (TypeSerializerSnapshot<K>) getNestedSerializerSnapshots()[0];
}

@SuppressWarnings("unchecked")
public TypeSerializerSnapshot<V> getValueSerializerSnapshot() {
return (TypeSerializerSnapshot<V>) getNestedSerializerSnapshots()[1];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotFactory;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -84,24 +85,39 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
snapshot.getOption(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
snapshot.getName(),
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<N>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.NAMESPACE_SERIALIZER))),
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER))),
(TypeSerializerSnapshot<N>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.NAMESPACE_SERIALIZER)),
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER)),
StateSnapshotTransformFactory.noTransform());

Preconditions.checkState(
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
}

public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
@Nonnull String name,
@Nonnull TypeSerializerSnapshot<N> namespaceSerializer,
@Nonnull TypeSerializerSnapshot<S> stateSerializer,
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory) {

this(
stateType,
name,
StateSerializerProvider.fromPreviousSerializerSnapshot(namespaceSerializer),
StateSerializerProvider.fromPreviousSerializerSnapshot(
new TtlAwareSerializerSnapshotFactory<>(stateType, stateSerializer)
.getWrappedTtlAwareTypeSerializerSnapshot()),
stateSnapshotTransformFactory);
}

private RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
@Nonnull String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
throw new UnsupportedOperationException(
"A serializer has already been registered for the state; re-registration is not allowed.");
}

TypeSerializerSchemaCompatibility<T> result =
newSerializer
.snapshotConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<V> previousStateSerializer = restoredKvMetaInfo.getStateSerializer();

TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
newStateSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(
previousStateSerializer.snapshotConfiguration());

if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException(
Expand All @@ -252,6 +255,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ previousStateSerializer
+ ").");
}
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);

restoredKvMetaInfo =
allowFutureMetadataUpdates
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.Objects;

/** Serializer to wrap state value with or without TTL. */
public class TtlAwareSerializer<T> extends TypeSerializer<T> {

private final boolean isTtlEnabled;

private final TypeSerializer<T> typeSerializer;

public TtlAwareSerializer(TypeSerializer<T> typeSerializer) {
this.typeSerializer = typeSerializer;
this.isTtlEnabled = typeSerializer instanceof TtlStateFactory.TtlSerializer;
}

@Override
public boolean isImmutableType() {
return typeSerializer.isImmutableType();
}

@Override
public TypeSerializer<T> duplicate() {
return new TtlAwareSerializer<>(typeSerializer.duplicate());
}

@Override
public T createInstance() {
return typeSerializer.createInstance();
}

@Override
public T copy(T from) {
return typeSerializer.copy(from);
}

@Override
public T copy(T from, T reuse) {
return typeSerializer.copy(from, reuse);
}

@Override
public int getLength() {
return typeSerializer.getLength();
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
typeSerializer.serialize(record, target);
}

@Override
public T deserialize(DataInputView source) throws IOException {
return typeSerializer.deserialize(source);
}

@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return typeSerializer.deserialize(reuse, source);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TtlAwareSerializer<?> that = (TtlAwareSerializer<?>) o;
return isTtlEnabled == that.isTtlEnabled
&& Objects.equals(typeSerializer, that.typeSerializer);
}

@Override
public int hashCode() {
return Objects.hash(isTtlEnabled, typeSerializer);
}

@SuppressWarnings("unchecked")
public void migrateValueFromPriorSerializer(
boolean priorTtlEnabled,
SupplierWithException<T, IOException> inputSupplier,
DataOutputView target,
TtlTimeProvider ttlTimeProvider)
throws IOException {
T outputRecord;
if (this.isTtlEnabled()) {
outputRecord =
priorTtlEnabled
? inputSupplier.get()
: (T)
new TtlValue<>(
inputSupplier.get(),
ttlTimeProvider.currentTimestamp());
} else {
outputRecord =
priorTtlEnabled
? ((TtlValue<T>) inputSupplier.get()).getUserValue()
: inputSupplier.get();
}
this.serialize(outputRecord, target);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
typeSerializer.copy(source, target);
}

public boolean isTtlEnabled() {
return isTtlEnabled;
}

public TypeSerializer<T> getOriginalTypeSerializer() {
return typeSerializer;
}

@Override
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new TtlAwareSerializerSnapshotFactory.TtlAwareSerializerSnapshot<>(
typeSerializer.snapshotConfiguration(), isTtlEnabled);
}

@SuppressWarnings("rawtypes")
public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
boolean ttlSerializer =
typeSerializer instanceof TtlAwareSerializer
&& ((TtlAwareSerializer) typeSerializer).isTtlEnabled();
boolean ttlListSerializer =
typeSerializer instanceof ListSerializer
&& ((ListSerializer) typeSerializer).getElementSerializer()
instanceof TtlAwareSerializer
&& ((TtlAwareSerializer)
((ListSerializer) typeSerializer).getElementSerializer())
.isTtlEnabled();
boolean ttlMapSerializer =
typeSerializer instanceof MapSerializer
&& ((MapSerializer) typeSerializer).getValueSerializer()
instanceof TtlAwareSerializer
&& ((TtlAwareSerializer)
((MapSerializer) typeSerializer).getValueSerializer())
.isTtlEnabled();
return ttlSerializer || ttlListSerializer || ttlMapSerializer;
}
}
Loading

0 comments on commit e7e602c

Please sign in to comment.