diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java index 4ed106e88cc72..5622089bd9ba4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -294,7 +295,7 @@ private static class LazilyRegisteredStateSerializerProvider @Nonnull @Override - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "unchecked"}) public TypeSerializerSchemaCompatibility registerNewSerializerForRestoredState( TypeSerializer newSerializer) { checkNotNull(newSerializer); @@ -303,10 +304,31 @@ public TypeSerializerSchemaCompatibility registerNewSerializerForRestoredStat "A serializer has already been registered for the state; re-registration is not allowed."); } + TypeSerializer newOriginalSerializer = newSerializer; + TypeSerializer previousOriginalSerializer = previousSchemaSerializer(); + TypeSerializerSnapshot previousOriginalSerializerSnapshot = + previousSerializerSnapshot; + + if (TtlStateFactory.TtlSerializer.isTtlStateMigration( + newOriginalSerializer, previousOriginalSerializer)) { + // extract original TypeSerializer to support TTL migration + // compatibility check + newOriginalSerializer = + (TypeSerializer) + TtlStateFactory.TtlSerializer.extractOriginalTypeSerializer( + newSerializer); + previousOriginalSerializer = + (TypeSerializer) + TtlStateFactory.TtlSerializer.extractOriginalTypeSerializer( + previousOriginalSerializer); + previousOriginalSerializerSnapshot = + previousOriginalSerializer.snapshotConfiguration(); + } + TypeSerializerSchemaCompatibility result = - newSerializer + newOriginalSerializer .snapshotConfiguration() - .resolveSchemaCompatibility(previousSerializerSnapshot); + .resolveSchemaCompatibility(previousOriginalSerializerSnapshot); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index daabc9302c86b..67a1892ece95b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -242,7 +242,10 @@ private StateTable tryRegisterStateTable( TypeSerializer previousStateSerializer = restoredKvMetaInfo.getStateSerializer(); TypeSerializerSchemaCompatibility stateCompatibility = - restoredKvMetaInfo.updateStateSerializer(newStateSerializer); + newStateSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility( + previousStateSerializer.snapshotConfiguration()); if (stateCompatibility.isIncompatible()) { throw new StateMigrationException( @@ -252,6 +255,7 @@ private StateTable tryRegisterStateTable( + previousStateSerializer + ")."); } + restoredKvMetaInfo.updateStateSerializer(newStateSerializer); restoredKvMetaInfo = allowFutureMetadataUpdates diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index df93d4114722f..354d7711d3419 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -314,12 +314,12 @@ protected CompositeSerializer> createSerializerInstance( } @SuppressWarnings("unchecked") - TypeSerializer getTimestampSerializer() { + public TypeSerializer getTimestampSerializer() { return (TypeSerializer) (TypeSerializer) fieldSerializers[0]; } @SuppressWarnings("unchecked") - TypeSerializer getValueSerializer() { + public TypeSerializer getValueSerializer() { return (TypeSerializer) fieldSerializers[1]; } @@ -329,16 +329,77 @@ public TypeSerializerSnapshot> snapshotConfiguration() { } public static boolean isTtlStateSerializer(TypeSerializer typeSerializer) { - boolean ttlSerializer = typeSerializer instanceof TtlStateFactory.TtlSerializer; - boolean ttlListSerializer = - typeSerializer instanceof ListSerializer - && ((ListSerializer) typeSerializer).getElementSerializer() - instanceof TtlStateFactory.TtlSerializer; - boolean ttlMapSerializer = - typeSerializer instanceof MapSerializer - && ((MapSerializer) typeSerializer).getValueSerializer() - instanceof TtlStateFactory.TtlSerializer; - return ttlSerializer || ttlListSerializer || ttlMapSerializer; + return isTtlValueStateSerializer(typeSerializer) + || isTtlListStateSerializer(typeSerializer) + || isTtlMapStateSerializer(typeSerializer); + } + + public static boolean isTtlValueStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof TtlStateFactory.TtlSerializer; + } + + @SuppressWarnings("rawtypes") + public static boolean isTtlListStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof ListSerializer + && ((ListSerializer) typeSerializer).getElementSerializer() + instanceof TtlStateFactory.TtlSerializer; + } + + @SuppressWarnings("rawtypes") + public static boolean isTtlMapStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof MapSerializer + && ((MapSerializer) typeSerializer).getValueSerializer() + instanceof TtlStateFactory.TtlSerializer; + } + + public static boolean isTtlStateMigration( + TypeSerializer previousSerializer, TypeSerializer newSerializer) { + return isMigrateFromDisablingToEnabling(previousSerializer, newSerializer) + || isMigrateFromEnablingToDisabling(previousSerializer, newSerializer); + } + + public static boolean isMigrateFromDisablingToEnabling( + TypeSerializer previousSerializer, TypeSerializer newSerializer) { + return !isTtlStateSerializer(previousSerializer) && isTtlStateSerializer(newSerializer); + } + + public static boolean isMigrateFromEnablingToDisabling( + TypeSerializer previousSerializer, TypeSerializer newSerializer) { + return isTtlStateSerializer(previousSerializer) && !isTtlStateSerializer(newSerializer); + } + + public static TypeSerializer extractOriginalTypeSerializer( + TypeSerializer newSerializer) { + if (isTtlValueStateSerializer(newSerializer)) { + return ((TtlStateFactory.TtlSerializer) newSerializer).getValueSerializer(); + } else if (isTtlListStateSerializer(newSerializer)) { + TtlStateFactory.TtlSerializer elementSerializer = + (TtlStateFactory.TtlSerializer) + ((ListSerializer) newSerializer).getElementSerializer(); + return new ListSerializer<>(elementSerializer.getValueSerializer()); + } else if (isTtlMapStateSerializer(newSerializer)) { + MapSerializer mapSerializer = ((MapSerializer) newSerializer); + TypeSerializer keySerializer = mapSerializer.getKeySerializer(); + TtlStateFactory.TtlSerializer valueSerializer = + (TtlStateFactory.TtlSerializer) mapSerializer.getValueSerializer(); + return new MapSerializer<>(keySerializer, valueSerializer.getValueSerializer()); + } + + return newSerializer; + } + + public static TypeSerializerSnapshot extractOriginalTypeSerializerSnapshot( + TypeSerializerSnapshot newSerializerSnapshot) { + if (newSerializerSnapshot instanceof TtlSerializerSnapshot) { + TtlSerializerSnapshot ttlSerializerSnapshot = + (TtlSerializerSnapshot) newSerializerSnapshot; + TypeSerializerSnapshot[] nestedSerializerSnapshots = + ttlSerializerSnapshot.getNestedSerializerSnapshots(); + Preconditions.checkState(nestedSerializerSnapshots.length == 2); + return nestedSerializerSnapshots[1]; + } + + return newSerializerSnapshot; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 731961b8289ed..e3a20324e5969 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -161,7 +161,7 @@ void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() { e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } - private void testKeyedValueStateUpgrade( + protected void testKeyedValueStateUpgrade( ValueStateDescriptor initialAccessDescriptor, ValueStateDescriptor newAccessDescriptorAfterRestore) throws Exception { @@ -278,7 +278,7 @@ void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() { e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } - private void testKeyedListStateUpgrade( + protected void testKeyedListStateUpgrade( ListStateDescriptor initialAccessDescriptor, ListStateDescriptor newAccessDescriptorAfterRestore) throws Exception { @@ -438,7 +438,7 @@ private Iterator> sortedIterator( return set.iterator(); } - private void testKeyedMapStateUpgrade( + protected void testKeyedMapStateUpgrade( MapStateDescriptor initialAccessDescriptor, MapStateDescriptor newAccessDescriptorAfterRestore) throws Exception { @@ -1209,7 +1209,7 @@ void testStateMigrationAfterChangingTTL() throws Exception { } @TestTemplate - void testStateMigrationAfterChangingTTLFromEnablingToDisabling() { + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { final String stateName = "test-ttl"; ValueStateDescriptor initialAccessDescriptor = @@ -1229,12 +1229,11 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() { } @TestTemplate - void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { final String stateName = "test-ttl"; ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); - ValueStateDescriptor newAccessDescriptorAfterRestore = new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer()); newAccessDescriptorAfterRestore.enableTimeToLive( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java index 28ecef52e871e..6055afa26c3b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -95,7 +95,7 @@ private CheckpointStreamFactory createCheckpointStreamFactory() { } } - void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) { + public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) { createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot); } @@ -150,7 +150,7 @@ private void disposeKeyedStateBackend() { } } - KeyedStateHandle takeSnapshot() throws Exception { + public KeyedStateHandle takeSnapshot() throws Exception { SnapshotResult snapshotResult = triggerSnapshot().get(); KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot(); if (jobManagerOwnedSnapshot != null) { @@ -177,7 +177,7 @@ public void setCurrentKey(String key) { } @SuppressWarnings("unchecked") - S createState( + public S createState( StateDescriptor stateDescriptor, @SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index bd658dbea440a..1c29703b40508 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -109,7 +109,7 @@ public boolean isSavepoint() { return (TtlMergingStateTestContext) ctx; } - private void initTest() throws Exception { + protected void initTest() throws Exception { initTest( StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java index 71cb21e15ef3a..382da2b7fd6de 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java @@ -64,4 +64,18 @@ protected boolean supportsKeySerializerCheck() { // TODO support checking key serializer return false; } + + @Override + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) { + super.testStateMigrationAfterChangingTTLFromDisablingToEnabling(); + } + } + + @Override + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) { + super.testStateMigrationAfterChangingTTLFromEnablingToDisabling(); + } + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 463c6e79f5d4c..e69ded19fd0ec 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -26,6 +26,9 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -195,6 +198,34 @@ public void migrateSerializedValue( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer priorSerializer, + TypeSerializer newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + try { + if (TtlStateFactory.TtlSerializer.isMigrateFromDisablingToEnabling( + priorSerializer, newSerializer)) { + V value = priorSerializer.deserialize(serializedOldValueInput); + TtlStateFactory.TtlSerializer ttlSerializer = + (TtlStateFactory.TtlSerializer) newSerializer; + TtlValue ttlValue = + ttlSerializer.createInstance(ttlTimeProvider.currentTimestamp(), value); + ttlSerializer.serialize(ttlValue, serializedMigratedValueOutput); + } else { + TtlStateFactory.TtlSerializer ttlSerializer = + (TtlStateFactory.TtlSerializer) priorSerializer; + TtlValue ttlValue = ttlSerializer.deserialize(serializedOldValueInput); + newSerializer.serialize(ttlValue.getUserValue(), serializedMigratedValueOutput); + } + } catch (Exception e) { + throw new StateMigrationException("Error while trying to migrate RocksDB state.", e); + } + } + byte[] getKeyBytes() { return serializeCurrentKeyWithGroupAndNamespace(); } @@ -233,6 +264,11 @@ protected AbstractRocksDBState setDefaultValue(V defaultValue) { return this; } + protected AbstractRocksDBState setColumnFamily(ColumnFamilyHandle columnFamily) { + this.columnFamily = columnFamily; + return this; + } + @Override public StateIncrementalVisitor getStateIncrementalVisitor( int recommendedMaxNumberOfReturnedRecords) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java index fc556cae3f70b..f1b0d7d1920c3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -192,6 +192,7 @@ static IS update( ((AggregatingStateDescriptor) stateDesc).getAggregateFunction()) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer(registerResult.f1.getStateSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 9f1857a6ccff1..11bb00cadda2d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -88,6 +89,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -676,25 +678,26 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { RocksDbKvStateInfo newRocksStateInfo; RegisteredKeyValueStateBackendMetaInfo newMetaInfo; + if (oldStateInfo != null) { @SuppressWarnings("unchecked") RegisteredKeyValueStateBackendMetaInfo castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo) oldStateInfo.metaInfo; - newMetaInfo = - updateRestoredStateMetaInfo( - Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo), - stateDesc, - namespaceSerializer, - stateSerializer); - + Tuple2> + newRocksDBState = + updateRestoredStateMetaInfo( + Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo), + stateDesc, + namespaceSerializer, + stateSerializer); + newMetaInfo = newRocksDBState.f1; newMetaInfo = allowFutureMetadataUpdates ? newMetaInfo.withSerializerUpgradesAllowed() : newMetaInfo; - newRocksStateInfo = - new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo); + newRocksStateInfo = new RocksDbKvStateInfo(newRocksDBState.f0, newMetaInfo); kvStateInformation.put(stateDesc.getName(), newRocksStateInfo); sstMergeManager.register(newRocksStateInfo); } else { @@ -738,13 +741,16 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { } private - RegisteredKeyValueStateBackendMetaInfo updateRestoredStateMetaInfo( - Tuple2> - oldStateInfo, - StateDescriptor stateDesc, - TypeSerializer namespaceSerializer, - TypeSerializer stateSerializer) - throws Exception { + Tuple2> + updateRestoredStateMetaInfo( + Tuple2< + ColumnFamilyHandle, + RegisteredKeyValueStateBackendMetaInfo> + oldStateInfo, + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + TypeSerializer stateSerializer) + throws Exception { RegisteredKeyValueStateBackendMetaInfo restoredKvStateMetaInfo = oldStateInfo.f1; @@ -773,7 +779,11 @@ RegisteredKeyValueStateBackendMetaInfo updateRestoredStateMetaInfo( TypeSerializerSchemaCompatibility newStateSerializerCompatibility = restoredKvStateMetaInfo.updateStateSerializer(stateSerializer); if (newStateSerializerCompatibility.isCompatibleAfterMigration()) { - migrateStateValues(stateDesc, oldStateInfo); + migrateStateValues( + stateDesc, + oldStateInfo, + TtlStateFactory.TtlSerializer.isTtlStateMigration( + previousStateSerializer, stateSerializer)); } else if (newStateSerializerCompatibility.isIncompatible()) { throw new StateMigrationException( "The new state serializer (" @@ -783,20 +793,20 @@ RegisteredKeyValueStateBackendMetaInfo updateRestoredStateMetaInfo( + ")."); } - return restoredKvStateMetaInfo; + return oldStateInfo; } /** * Migrate only the state value, that is the "value" that is stored in RocksDB. We don't migrate * the key here, which is made up of key group, key, namespace and map key (in case of - * MapState). + * MapState). Also support migrate state values between disabling and enabling ttl. */ @SuppressWarnings("unchecked") private void migrateStateValues( StateDescriptor stateDesc, - Tuple2> stateMetaInfo) + Tuple2> stateMetaInfo, + boolean isTtlStateMigration) throws Exception { - if (stateDesc.getType() == StateDescriptor.Type.MAP) { TypeSerializerSnapshot previousSerializerSnapshot = stateMetaInfo.f1.getPreviousStateSerializerSnapshot(); @@ -806,12 +816,11 @@ private void migrateStateValues( checkState( previousSerializerSnapshot instanceof MapSerializerSnapshot, "previous serializer snapshot should be a MapSerializerSnapshot."); - TypeSerializer newSerializer = stateMetaInfo.f1.getStateSerializer(); + checkState( newSerializer instanceof MapSerializer, "new serializer should be a MapSerializer."); - MapSerializer mapSerializer = (MapSerializer) newSerializer; MapSerializerSnapshot mapSerializerSnapshot = (MapSerializerSnapshot) previousSerializerSnapshot; @@ -834,7 +843,6 @@ private void migrateStateValues( "State should be an AbstractRocksDBState but is " + state); } - @SuppressWarnings("unchecked") AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; Snapshot rocksDBSnapshot = db.getSnapshot(); @@ -849,14 +857,39 @@ private void migrateStateValues( DataInputDeserializer serializedValueInput = new DataInputDeserializer(); DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512); + + if (isTtlStateMigration) { + // By performing ttl state migration, we need to recreate column family to + // enable/disable ttl compaction filter factory. + db.dropColumnFamily(stateMetaInfo.f0); + stateMetaInfo.f0 = + RocksDBOperationUtils.createColumnFamily( + RocksDBOperationUtils.createColumnFamilyDescriptor( + stateMetaInfo.f1, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + optionsContainer.getWriteBufferManagerCapacity()), + db, + Collections.emptyList(), + ICloseableRegistry.NO_OP); + } + while (iterator.isValid()) { serializedValueInput.setBuffer(iterator.value()); - - rocksDBState.migrateSerializedValue( - serializedValueInput, - migratedSerializedValueOutput, - stateMetaInfo.f1.getPreviousStateSerializer(), - stateMetaInfo.f1.getStateSerializer()); + if (isTtlStateMigration) { + rocksDBState.migrateSerializedTtlValue( + serializedValueInput, + migratedSerializedValueOutput, + stateMetaInfo.f1.getPreviousStateSerializer(), + stateMetaInfo.f1.getStateSerializer(), + this.ttlTimeProvider); + } else { + rocksDBState.migrateSerializedValue( + serializedValueInput, + migratedSerializedValueOutput, + stateMetaInfo.f1.getPreviousStateSerializer(), + stateMetaInfo.f1.getStateSerializer()); + } batchWriter.put( stateMetaInfo.f0, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index b5339fd4d25af..26e8e2ac167d1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -30,6 +30,9 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -226,6 +229,72 @@ public void migrateSerializedValue( } } + @Override + @SuppressWarnings({"unchecked"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer> priorSerializer, + TypeSerializer> newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + Preconditions.checkArgument(priorSerializer instanceof ListSerializer); + Preconditions.checkArgument(newSerializer instanceof ListSerializer); + + if (TtlStateFactory.TtlSerializer.isMigrateFromDisablingToEnabling( + priorSerializer, newSerializer)) { + TypeSerializer priorElementSerializer = + ((ListSerializer) priorSerializer).getElementSerializer(); + + TtlStateFactory.TtlSerializer ttlElementSerializer = + (TtlStateFactory.TtlSerializer) + ((ListSerializer) newSerializer).getElementSerializer(); + + try { + while (serializedOldValueInput.available() > 0) { + V element = + ListDelimitedSerializer.deserializeNextElement( + serializedOldValueInput, priorElementSerializer); + TtlValue ttlElement = + ttlElementSerializer.createInstance( + ttlTimeProvider.currentTimestamp(), element); + ttlElementSerializer.serialize(ttlElement, serializedMigratedValueOutput); + if (serializedOldValueInput.available() > 0) { + serializedMigratedValueOutput.write(DELIMITER); + } + } + } catch (Exception e) { + throw new StateMigrationException( + "Error while trying to migrate RocksDB state.", e); + } + } else { + TtlStateFactory.TtlSerializer ttlElementSerializer = + (TtlStateFactory.TtlSerializer) + ((ListSerializer) priorSerializer).getElementSerializer(); + + TypeSerializer newElementSerializer = + ((ListSerializer) newSerializer).getElementSerializer(); + + try { + while (serializedOldValueInput.available() > 0) { + TtlValue ttlElement = + ListDelimitedSerializer.deserializeNextElement( + serializedOldValueInput, ttlElementSerializer); + if (ttlElement != null) { + V element = ttlElement.getUserValue(); + newElementSerializer.serialize(element, serializedMigratedValueOutput); + if (serializedOldValueInput.available() > 0) { + serializedMigratedValueOutput.write(DELIMITER); + } + } + } + } catch (Exception e) { + throw new StateMigrationException( + "Error while trying to migrate RocksDB state.", e); + } + } + } + @Override protected RocksDBListState setValueSerializer( TypeSerializer> valueSerializer) { @@ -260,7 +329,8 @@ static IS update( .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer( (TypeSerializer>) registerResult.f1.getStateSerializer()) - .setDefaultValue((List) stateDesc.getDefaultValue()); + .setDefaultValue((List) stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } static class StateSnapshotTransformerWrapper implements StateSnapshotTransformer { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 58a58d8f1576d..c9a56df749682 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -32,6 +32,9 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -250,6 +253,69 @@ public void migrateSerializedValue( } } + @Override + @SuppressWarnings({"unchecked"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer> priorSerializer, + TypeSerializer> newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + checkArgument(priorSerializer instanceof MapSerializer); + checkArgument(newSerializer instanceof MapSerializer); + + if (TtlStateFactory.TtlSerializer.isMigrateFromDisablingToEnabling( + priorSerializer, newSerializer)) { + + TypeSerializer priorMapValueSerializer = + ((MapSerializer) priorSerializer).getValueSerializer(); + TtlStateFactory.TtlSerializer newMapValueSerializer = + (TtlStateFactory.TtlSerializer) + ((MapSerializer) newSerializer).getValueSerializer(); + + try { + boolean isNull = serializedOldValueInput.readBoolean(); + UV mapUserValue = null; + TtlValue ttlMapUserValue = null; + if (!isNull) { + mapUserValue = priorMapValueSerializer.deserialize(serializedOldValueInput); + ttlMapUserValue = + newMapValueSerializer.createInstance( + ttlTimeProvider.currentTimestamp(), mapUserValue); + } + + serializedMigratedValueOutput.writeBoolean(ttlMapUserValue == null); + newMapValueSerializer.serialize(ttlMapUserValue, serializedMigratedValueOutput); + } catch (Exception e) { + throw new StateMigrationException( + "Error while trying to migrate RocksDB map state.", e); + } + } else { + TtlStateFactory.TtlSerializer ttlMapValueSerializer = + (TtlStateFactory.TtlSerializer) + ((MapSerializer) priorSerializer).getValueSerializer(); + TypeSerializer newMapValueSerializer = + ((MapSerializer) newSerializer).getValueSerializer(); + + try { + boolean isNull = serializedOldValueInput.readBoolean(); + TtlValue ttlMapUserValue; + UV mapUserValue = null; + if (!isNull) { + ttlMapUserValue = ttlMapValueSerializer.deserialize(serializedOldValueInput); + mapUserValue = ttlMapUserValue.getUserValue(); + } + + serializedMigratedValueOutput.writeBoolean(mapUserValue == null); + newMapValueSerializer.serialize(mapUserValue, serializedMigratedValueOutput); + } catch (Exception e) { + throw new StateMigrationException( + "Error while trying to migrate RocksDB map state.", e); + } + } + } + @Override public Iterator> iterator() { final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace(); @@ -726,7 +792,8 @@ static IS update( .setValueSerializer( (TypeSerializer>) registerResult.f1.getStateSerializer()) - .setDefaultValue((Map) stateDesc.getDefaultValue()); + .setDefaultValue((Map) stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 74800101cd0b1..7ff67a1370d67 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -275,7 +275,7 @@ public static ColumnFamilyOptions createColumnFamilyOptions( .setMergeOperatorName(MERGE_OPERATOR_NAME); } - private static ColumnFamilyHandle createColumnFamily( + public static ColumnFamilyHandle createColumnFamily( ColumnFamilyDescriptor columnDescriptor, RocksDB db, List importFilesMetaData, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index e791b3f591a7e..1a555037fbe86 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -182,6 +182,7 @@ static IS update( .setReduceFunction( ((ReducingStateDescriptor) stateDesc).getReduceFunction()) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 958efad068ab2..cc7cea1b15b3e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -134,6 +134,7 @@ static IS update( ((RocksDBValueState) existingState) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer(registerResult.f1.getStateSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java index 0afc366ddc989..d5aedb4885155 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java @@ -18,16 +18,24 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; +import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -79,4 +87,85 @@ protected EmbeddedRocksDBStateBackend getStateBackend() throws Exception { protected CheckpointStorage getCheckpointStorage() throws Exception { return storageSupplier.get(); } + + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ValueStateDescriptor newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + ListStateDescriptor initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ListStateDescriptor newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessListDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + MapStateDescriptor initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + MapStateDescriptor newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessMapDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } + + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build()); + ValueStateDescriptor newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + ListStateDescriptor initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessListDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + ListStateDescriptor newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + MapStateDescriptor initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + initialAccessMapDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + MapStateDescriptor newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java index 6322f62c98243..1bd5a94e63678 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java @@ -18,14 +18,22 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; @@ -63,4 +71,85 @@ protected RocksDBStateBackend getStateBackend() throws IOException { backend.setDbStoragePath(dbPath); return backend; } + + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ValueStateDescriptor newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + ListStateDescriptor initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ListStateDescriptor newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessListDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + MapStateDescriptor initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + MapStateDescriptor newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessMapDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } + + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build()); + ValueStateDescriptor newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + ListStateDescriptor initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessListDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + ListStateDescriptor newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + MapStateDescriptor initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + initialAccessMapDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + MapStateDescriptor newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java index 14bfc4e838f13..d622d23ebb15d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java @@ -23,8 +23,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.ttl.MockTtlStateTest; import org.apache.flink.runtime.state.ttl.StateBackendTestContext; import org.apache.flink.runtime.state.ttl.TtlStateTestBase; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -40,6 +42,7 @@ import java.nio.file.Path; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; /** Base test suite for rocksdb state TTL. */ public abstract class RocksDBTtlStateTestBase extends TtlStateTestBase { @@ -90,6 +93,22 @@ public void testCompactFilterWithSnapshotAndRescalingAfterRestore() throws Excep testCompactFilter(true, true); } + @TestTemplate + void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { + assumeThat(this).isNotInstanceOf(MockTtlStateTest.class); + + initTest(); + + timeProvider.time = 0; + ctx().update(ctx().updateEmpty); + + KeyedStateHandle snapshot = sbetc.takeSnapshot(); + sbetc.createAndRestoreKeyedStateBackend(snapshot); + + sbetc.setCurrentKey("defaultKey"); + sbetc.createState(ctx().createStateDescriptor(), ""); + } + @SuppressWarnings("resource") private void testCompactFilter(boolean takeSnapshot, boolean rescaleAfterRestore) throws Exception {