diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java index 4ed106e88cc723..017b809bacd08d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -294,7 +297,7 @@ private static class LazilyRegisteredStateSerializerProvider @Nonnull @Override - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "unchecked"}) public TypeSerializerSchemaCompatibility registerNewSerializerForRestoredState( TypeSerializer newSerializer) { checkNotNull(newSerializer); @@ -303,8 +306,13 @@ public TypeSerializerSchemaCompatibility registerNewSerializerForRestoredStat "A serializer has already been registered for the state; re-registration is not allowed."); } + TypeSerializer originalSerializer = newSerializer; + if (isTtlMigration(newSerializer)) { + originalSerializer = getTtlValueSerializer(newSerializer); + } + TypeSerializerSchemaCompatibility result = - newSerializer + originalSerializer .snapshotConfiguration() .resolveSchemaCompatibility(previousSerializerSnapshot); if (result.isIncompatible()) { @@ -325,6 +333,36 @@ public TypeSerializerSchemaCompatibility setPreviousSerializerSnapshotForRest throw new UnsupportedOperationException( "The snapshot of the state's previous serializer has already been set; cannot reset."); } + + @SuppressWarnings({"unchecked"}) + private TypeSerializer getTtlValueSerializer(TypeSerializer newSerializer) { + if (TtlStateFactory.TtlSerializer.isTtlValueStateSerializer(newSerializer)) { + return ((TtlStateFactory.TtlSerializer) newSerializer).getValueSerializer(); + } else if (TtlStateFactory.TtlSerializer.isTtlListStateSerializer(newSerializer)) { + TtlStateFactory.TtlSerializer elementSerializer = + (TtlStateFactory.TtlSerializer) + ((ListSerializer) newSerializer).getElementSerializer(); + + return (TypeSerializer) + new ListSerializer<>(elementSerializer.getValueSerializer()); + } else if (TtlStateFactory.TtlSerializer.isTtlMapStateSerializer(newSerializer)) { + MapSerializer mapSerializer = ((MapSerializer) newSerializer); + TypeSerializer keySerializer = mapSerializer.getKeySerializer(); + TtlStateFactory.TtlSerializer valueSerializer = + (TtlStateFactory.TtlSerializer) mapSerializer.getValueSerializer(); + return (TypeSerializer) + new MapSerializer<>(keySerializer, valueSerializer.getValueSerializer()); + } + + return newSerializer; + } + + private boolean isTtlMigration(TypeSerializer newSerializer) { + return previousSerializerSnapshot != null + && !TtlStateFactory.TtlSerializer.isTtlStateSerializer( + previousSchemaSerializer()) + && TtlStateFactory.TtlSerializer.isTtlStateSerializer(newSerializer); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index df93d4114722f0..e591b82ef475c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -314,12 +314,12 @@ protected CompositeSerializer> createSerializerInstance( } @SuppressWarnings("unchecked") - TypeSerializer getTimestampSerializer() { + public TypeSerializer getTimestampSerializer() { return (TypeSerializer) (TypeSerializer) fieldSerializers[0]; } @SuppressWarnings("unchecked") - TypeSerializer getValueSerializer() { + public TypeSerializer getValueSerializer() { return (TypeSerializer) fieldSerializers[1]; } @@ -329,16 +329,25 @@ public TypeSerializerSnapshot> snapshotConfiguration() { } public static boolean isTtlStateSerializer(TypeSerializer typeSerializer) { - boolean ttlSerializer = typeSerializer instanceof TtlStateFactory.TtlSerializer; - boolean ttlListSerializer = - typeSerializer instanceof ListSerializer - && ((ListSerializer) typeSerializer).getElementSerializer() - instanceof TtlStateFactory.TtlSerializer; - boolean ttlMapSerializer = - typeSerializer instanceof MapSerializer - && ((MapSerializer) typeSerializer).getValueSerializer() - instanceof TtlStateFactory.TtlSerializer; - return ttlSerializer || ttlListSerializer || ttlMapSerializer; + return isTtlValueStateSerializer(typeSerializer) + || isTtlListStateSerializer(typeSerializer) + || isTtlMapStateSerializer(typeSerializer); + } + + public static boolean isTtlValueStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof TtlStateFactory.TtlSerializer; + } + + public static boolean isTtlListStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof ListSerializer + && ((ListSerializer) typeSerializer).getElementSerializer() + instanceof TtlStateFactory.TtlSerializer; + } + + public static boolean isTtlMapStateSerializer(TypeSerializer typeSerializer) { + return typeSerializer instanceof MapSerializer + && ((MapSerializer) typeSerializer).getValueSerializer() + instanceof TtlStateFactory.TtlSerializer; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java index 344f0b5e949e87..6e949fe82ef765 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java @@ -32,7 +32,7 @@ * @param The type of the namespace * @param Type of the user value of state with TTL */ -class TtlValueState +public class TtlValueState extends AbstractTtlState, InternalValueState>> implements InternalValueState { TtlValueState(TtlStateContext>, T> tTtlStateContext) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 731961b8289ede..9ad8dede13c4ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -1229,24 +1229,44 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() { } @TestTemplate - void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { + void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { final String stateName = "test-ttl"; ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); - ValueStateDescriptor newAccessDescriptorAfterRestore = - new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer()); + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); newAccessDescriptorAfterRestore.enableTimeToLive( StateTtlConfig.newBuilder(Time.days(1)).build()); - assertThatThrownBy( - () -> - testKeyedValueStateUpgrade( - initialAccessDescriptor, newAccessDescriptorAfterRestore)) - .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(StateMigrationException.class), - e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); + ListStateDescriptor initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ListStateDescriptor newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessListDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + MapStateDescriptor initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + MapStateDescriptor newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessMapDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Time.days(1)).build()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); } // ------------------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 463c6e79f5d4c5..cf9022f62b91b3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -26,6 +27,9 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -35,6 +39,8 @@ import org.rocksdb.WriteOptions; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * Base class for {@link State} implementations that store state in a RocksDB database. @@ -195,6 +201,47 @@ public void migrateSerializedValue( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer priorSerializer, + TypeSerializer newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + try { + V value = priorSerializer.deserialize(serializedOldValueInput); + if (TtlStateFactory.TtlSerializer.isTtlValueStateSerializer(newSerializer)) { + TtlStateFactory.TtlSerializer ttlSerializer = + (TtlStateFactory.TtlSerializer) newSerializer; + TtlValue ttlValue = + ttlSerializer.createInstance(ttlTimeProvider.currentTimestamp(), value); + ttlSerializer.serialize(ttlValue, serializedMigratedValueOutput); + } else if (TtlStateFactory.TtlSerializer.isTtlMapStateSerializer(newSerializer)) { + MapSerializer ttlMapSerializer = (MapSerializer) newSerializer; + Map ttlValueMap = getTtlValueMap(ttlTimeProvider, ttlMapSerializer, (Map) value); + ttlMapSerializer.serialize(ttlValueMap, serializedMigratedValueOutput); + } + } catch (Exception e) { + throw new StateMigrationException("Error while trying to migrate RocksDB state.", e); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static Map getTtlValueMap( + TtlTimeProvider ttlTimeProvider, MapSerializer ttlMapSerializer, Map valueMap) { + TtlStateFactory.TtlSerializer ttlValueSerializer = + (TtlStateFactory.TtlSerializer) ttlMapSerializer.getValueSerializer(); + Map ttlValueMap = new HashMap<>(); + valueMap.forEach( + (k, v) -> + ttlValueMap.put( + k, + ttlValueSerializer.createInstance( + ttlTimeProvider.currentTimestamp(), v))); + return ttlValueMap; + } + byte[] getKeyBytes() { return serializeCurrentKeyWithGroupAndNamespace(); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index cef331f2d67d60..9909c033edb4c0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; @@ -62,6 +63,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -770,48 +772,58 @@ RegisteredKeyValueStateBackendMetaInfo updateRestoredStateMetaInfo( // it anymore to improve the error message TypeSerializer previousStateSerializer = restoredKvStateMetaInfo.getStateSerializer(); - TypeSerializerSchemaCompatibility newStateSerializerCompatibility = - restoredKvStateMetaInfo.updateStateSerializer(stateSerializer); - if (newStateSerializerCompatibility.isCompatibleAfterMigration()) { - migrateStateValues(stateDesc, oldStateInfo); - } else if (newStateSerializerCompatibility.isIncompatible()) { - throw new StateMigrationException( - "The new state serializer (" - + stateSerializer - + ") must not be incompatible with the old state serializer (" - + previousStateSerializer - + ")."); + // migrate state from non-ttl state to ttl state + if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(stateSerializer) + && !TtlStateFactory.TtlSerializer.isTtlStateSerializer(previousStateSerializer)) { + TypeSerializerSchemaCompatibility newStateSerializerCompatibility = + restoredKvStateMetaInfo.updateStateSerializer(stateSerializer); + if (!newStateSerializerCompatibility.isIncompatible()) { + migrateStateValuesWithTtl(stateDesc, oldStateInfo, stateDesc.getTtlConfig()); + + ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl( + restoredKvStateMetaInfo, + RocksDBOperationUtils.createColumnFamilyOptions( + columnFamilyOptionsFactory, stateDesc.getName())); + } + } else { + TypeSerializerSchemaCompatibility newStateSerializerCompatibility = + restoredKvStateMetaInfo.updateStateSerializer(stateSerializer); + if (newStateSerializerCompatibility.isCompatibleAfterMigration()) { + migrateStateValues(stateDesc, oldStateInfo); + } else if (newStateSerializerCompatibility.isIncompatible()) { + throw new StateMigrationException( + "The new state serializer (" + + stateSerializer + + ") must not be incompatible with the old state serializer (" + + previousStateSerializer + + ")."); + } } return restoredKvStateMetaInfo; } - /** - * Migrate only the state value, that is the "value" that is stored in RocksDB. We don't migrate - * the key here, which is made up of key group, key, namespace and map key (in case of - * MapState). - */ + /** Migrate state value from disabling ttl to enabling. */ @SuppressWarnings("unchecked") - private void migrateStateValues( + private void migrateStateValuesWithTtl( StateDescriptor stateDesc, - Tuple2> stateMetaInfo) + Tuple2> oldStateInfo, + StateTtlConfig ttlConfig) throws Exception { - if (stateDesc.getType() == StateDescriptor.Type.MAP) { TypeSerializerSnapshot previousSerializerSnapshot = - stateMetaInfo.f1.getPreviousStateSerializerSnapshot(); + oldStateInfo.f1.getPreviousStateSerializerSnapshot(); checkState( previousSerializerSnapshot != null, "the previous serializer snapshot should exist."); checkState( previousSerializerSnapshot instanceof MapSerializerSnapshot, "previous serializer snapshot should be a MapSerializerSnapshot."); + TypeSerializer newSerializer = stateDesc.getSerializer(); - TypeSerializer newSerializer = stateMetaInfo.f1.getStateSerializer(); checkState( newSerializer instanceof MapSerializer, "new serializer should be a MapSerializer."); - MapSerializer mapSerializer = (MapSerializer) newSerializer; MapSerializerSnapshot mapSerializerSnapshot = (MapSerializerSnapshot) previousSerializerSnapshot; @@ -828,18 +840,17 @@ private void migrateStateValues( // we need to get an actual state instance because migration is different // for different state types. For example, ListState needs to deal with // individual elements - State state = createState(stateDesc, stateMetaInfo); + State state = createState(stateDesc, oldStateInfo); if (!(state instanceof AbstractRocksDBState)) { throw new FlinkRuntimeException( "State should be an AbstractRocksDBState but is " + state); } - @SuppressWarnings("unchecked") AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; Snapshot rocksDBSnapshot = db.getSnapshot(); try (RocksIteratorWrapper iterator = - RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions); + RocksDBOperationUtils.getRocksIterator(db, oldStateInfo.f0, readOptions); RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())) { iterator.seekToFirst(); @@ -849,14 +860,23 @@ private void migrateStateValues( while (iterator.isValid()) { serializedValueInput.setBuffer(iterator.value()); - rocksDBState.migrateSerializedValue( - serializedValueInput, - migratedSerializedValueOutput, - stateMetaInfo.f1.getPreviousStateSerializer(), - stateMetaInfo.f1.getStateSerializer()); + if (ttlConfig.isEnabled()) { + rocksDBState.migrateSerializedTtlValue( + serializedValueInput, + migratedSerializedValueOutput, + oldStateInfo.f1.getPreviousStateSerializer(), + oldStateInfo.f1.getStateSerializer(), + this.ttlTimeProvider); + } else { + rocksDBState.migrateSerializedValue( + serializedValueInput, + migratedSerializedValueOutput, + oldStateInfo.f1.getPreviousStateSerializer(), + oldStateInfo.f1.getStateSerializer()); + } batchWriter.put( - stateMetaInfo.f0, + oldStateInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer()); @@ -869,6 +889,20 @@ private void migrateStateValues( } } + /** + * Migrate only the state value, that is the "value" that is stored in RocksDB. We don't migrate + * the key here, which is made up of key group, key, namespace and map key (in case of + * MapState). + */ + @SuppressWarnings("unchecked") + private void migrateStateValues( + StateDescriptor stateDesc, + Tuple2> stateMetaInfo) + throws Exception { + + migrateStateValuesWithTtl(stateDesc, stateMetaInfo, StateTtlConfig.DISABLED); + } + @SuppressWarnings("unchecked") private static boolean checkMapStateKeySchemaCompatibility( MapSerializerSnapshot mapStateSerializerSnapshot, @@ -910,6 +944,7 @@ public IS createOrUpdateInternalStat namespaceSerializer, snapshotTransformFactory, allowFutureMetadataUpdates); + if (!allowFutureMetadataUpdates) { // Config compact filter only when no future metadata updates ttlCompactFiltersManager.configCompactFilter( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index b5339fd4d25af2..2c2b0fa49958c7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -30,6 +30,9 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -226,6 +229,44 @@ public void migrateSerializedValue( } } + @Override + @SuppressWarnings({"unchecked"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer> priorSerializer, + TypeSerializer> newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + Preconditions.checkArgument(priorSerializer instanceof ListSerializer); + Preconditions.checkArgument( + TtlStateFactory.TtlSerializer.isTtlListStateSerializer(newSerializer)); + + TypeSerializer priorElementSerializer = + ((ListSerializer) priorSerializer).getElementSerializer(); + + TtlStateFactory.TtlSerializer ttlElementSerializer = + (TtlStateFactory.TtlSerializer) + ((ListSerializer) newSerializer).getElementSerializer(); + + try { + while (serializedOldValueInput.available() > 0) { + V element = + ListDelimitedSerializer.deserializeNextElement( + serializedOldValueInput, priorElementSerializer); + TtlValue ttlElement = + ttlElementSerializer.createInstance( + ttlTimeProvider.currentTimestamp(), element); + ttlElementSerializer.serialize(ttlElement, serializedMigratedValueOutput); + if (serializedOldValueInput.available() > 0) { + serializedMigratedValueOutput.write(DELIMITER); + } + } + } catch (Exception e) { + throw new StateMigrationException("Error while trying to migrate RocksDB state.", e); + } + } + @Override protected RocksDBListState setValueSerializer( TypeSerializer> valueSerializer) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 58a58d8f1576d9..1800a2abbf2406 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -32,6 +32,9 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -250,6 +253,43 @@ public void migrateSerializedValue( } } + @Override + @SuppressWarnings({"unchecked"}) + public void migrateSerializedTtlValue( + DataInputDeserializer serializedOldValueInput, + DataOutputSerializer serializedMigratedValueOutput, + TypeSerializer> priorSerializer, + TypeSerializer> newSerializer, + TtlTimeProvider ttlTimeProvider) + throws StateMigrationException { + checkArgument(priorSerializer instanceof MapSerializer); + checkArgument(TtlStateFactory.TtlSerializer.isTtlMapStateSerializer(newSerializer)); + + TypeSerializer priorMapValueSerializer = + ((MapSerializer) priorSerializer).getValueSerializer(); + TtlStateFactory.TtlSerializer newMapValueSerializer = + (TtlStateFactory.TtlSerializer) + ((MapSerializer) newSerializer).getValueSerializer(); + + try { + boolean isNull = serializedOldValueInput.readBoolean(); + UV mapUserValue = null; + TtlValue ttlMapUserValue = null; + if (!isNull) { + mapUserValue = priorMapValueSerializer.deserialize(serializedOldValueInput); + ttlMapUserValue = + newMapValueSerializer.createInstance( + ttlTimeProvider.currentTimestamp(), mapUserValue); + } + + serializedMigratedValueOutput.writeBoolean(ttlMapUserValue == null); + newMapValueSerializer.serialize(ttlMapUserValue, serializedMigratedValueOutput); + } catch (Exception e) { + throw new StateMigrationException( + "Error while trying to migrate RocksDB map state.", e); + } + } + @Override public Iterator> iterator() { final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();