From f96c49c68f231cc6e000e3c4bb2f953bb635735a Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Mon, 7 Oct 2024 19:58:04 +0800 Subject: [PATCH] fix ut --- .../CompositeTypeSerializerSnapshot.java | 4 ++ .../CompositeTypeSerializerUtil.java | 19 +++++- .../state/heap/HeapKeyedStateBackend.java | 3 +- .../ttl/TtlAwareStateDescriptorFactory.java | 58 ++++++++++++++----- .../state/StateBackendMigrationTestBase.java | 4 +- .../runtime/state/StateBackendTestBase.java | 8 ++- ...eddedRocksDBStateBackendMigrationTest.java | 15 ++--- 7 files changed, 82 insertions(+), 29 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java index f96acd6af0c73..04b1953a0d4ac 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java @@ -425,6 +425,10 @@ private TypeSerializerSchemaCompatibility constructFinalSchemaCompatibilityRe return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); } + if (nestedSerializersCompatibilityResult.isCompatibleAfterTtlMigration()) { + return TypeSerializerSchemaCompatibility.compatibleAfterTtlMigration(); + } + if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) { TypeSerializer reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers( diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java index 8382daa20aca7..6a8820ab2a4b4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java @@ -97,6 +97,7 @@ public static IntermediateCompatibilityResult constructIntermediateCompat new TypeSerializer[newNestedSerializerSnapshots.length]; // check nested serializers for compatibility + boolean nestedSerializerRequiresTtlMigration = false; boolean nestedSerializerRequiresMigration = false; boolean hasReconfiguredNestedSerializers = false; for (int i = 0; i < oldNestedSerializerSnapshots.length; i++) { @@ -110,7 +111,9 @@ public static IntermediateCompatibilityResult constructIntermediateCompat return IntermediateCompatibilityResult.definedIncompatibleResult(); } - if (compatibility.isCompatibleAfterMigration()) { + if (compatibility.isCompatibleAfterTtlMigration()) { + nestedSerializerRequiresTtlMigration = true; + } else if (compatibility.isCompatibleAfterMigration()) { nestedSerializerRequiresMigration = true; } else if (compatibility.isCompatibleWithReconfiguredSerializer()) { hasReconfiguredNestedSerializers = true; @@ -122,6 +125,10 @@ public static IntermediateCompatibilityResult constructIntermediateCompat } } + if (nestedSerializerRequiresTtlMigration) { + return IntermediateCompatibilityResult.definedCompatibleAfterTtlMigrationResult(); + } + if (nestedSerializerRequiresMigration) { return IntermediateCompatibilityResult.definedCompatibleAfterMigrationResult(); } @@ -150,6 +157,11 @@ static IntermediateCompatibilityResult definedIncompatibleResult() { TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE, null); } + static IntermediateCompatibilityResult definedCompatibleAfterTtlMigrationResult() { + return new IntermediateCompatibilityResult<>( + TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_TTL_MIGRATION, null); + } + static IntermediateCompatibilityResult definedCompatibleAfterMigrationResult() { return new IntermediateCompatibilityResult<>( TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION, null); @@ -184,6 +196,11 @@ public boolean isCompatibleAfterMigration() { == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION; } + public boolean isCompatibleAfterTtlMigration() { + return compatibilityType + == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_TTL_MIGRATION; + } + public boolean isIncompatible() { return compatibilityType == TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 67a1892ece95b..c1621ec5b0e58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -247,7 +247,8 @@ private StateTable tryRegisterStateTable( .resolveSchemaCompatibility( previousStateSerializer.snapshotConfiguration()); - if (stateCompatibility.isIncompatible()) { + if (stateCompatibility.isIncompatible() + || stateCompatibility.isCompatibleAfterTtlMigration()) { throw new StateMigrationException( "For heap backends, the new state serializer (" + newStateSerializer diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareStateDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareStateDescriptorFactory.java index 552d2b63c30c4..b6a44f69baf89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareStateDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAwareStateDescriptorFactory.java @@ -70,60 +70,86 @@ public StateDescriptor getWrappedTtlAwareStateDescriptor() { @SuppressWarnings("unchecked") private StateDescriptor wrapValueDescriptor() { - return stateDesc.getSerializer() instanceof TtlAwareSerializer - ? stateDesc - : (StateDescriptor) + if (stateDesc.getSerializer() instanceof TtlAwareSerializer) { + return stateDesc; + } + + StateDescriptor ttlAwareStateDesc = + (StateDescriptor) new ValueStateDescriptor<>( stateDesc.getName(), - new TtlAwareSerializer<>(stateDesc.getSerializer())); + new TtlAwareSerializer<>(stateDesc.getSerializer()), + stateDesc.getDefaultValue()); + ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig()); + return ttlAwareStateDesc; } @SuppressWarnings({"unchecked", "rawtypes"}) private StateDescriptor wrapListDescriptor() { ListStateDescriptor listStateDesc = (ListStateDescriptor) stateDesc; - return listStateDesc.getElementSerializer() instanceof TtlAwareSerializer - ? listStateDesc - : (StateDescriptor) + if (listStateDesc.getElementSerializer() instanceof TtlAwareSerializer) { + return listStateDesc; + } + + StateDescriptor ttlAwareStateDesc = + (StateDescriptor) new ListStateDescriptor<>( listStateDesc.getName(), new TtlAwareSerializer<>(listStateDesc.getElementSerializer())); + ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig()); + return ttlAwareStateDesc; } @SuppressWarnings({"unchecked", "rawtypes"}) private StateDescriptor wrapMapDescriptor() { MapStateDescriptor mapStateDescriptor = (MapStateDescriptor) stateDesc; - return mapStateDescriptor.getValueSerializer() instanceof TtlAwareSerializer - ? mapStateDescriptor - : (StateDescriptor) + if (mapStateDescriptor.getValueSerializer() instanceof TtlAwareSerializer) { + return mapStateDescriptor; + } + + StateDescriptor ttlAwareStateDesc = + (StateDescriptor) new MapStateDescriptor<>( mapStateDescriptor.getName(), mapStateDescriptor.getKeySerializer(), new TtlAwareSerializer<>(mapStateDescriptor.getValueSerializer())); + ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig()); + return ttlAwareStateDesc; } @SuppressWarnings({"unchecked", "rawtypes"}) private StateDescriptor wrapAggregatingDescriptor() { AggregatingStateDescriptor aggregatingStateDescriptor = (AggregatingStateDescriptor) stateDesc; - return aggregatingStateDescriptor.getSerializer() instanceof TtlAwareSerializer - ? aggregatingStateDescriptor - : (StateDescriptor) + if (aggregatingStateDescriptor.getSerializer() instanceof TtlAwareSerializer) { + return aggregatingStateDescriptor; + } + + StateDescriptor ttlAwareStateDesc = + (StateDescriptor) new AggregatingStateDescriptor<>( aggregatingStateDescriptor.getName(), aggregatingStateDescriptor.getAggregateFunction(), new TtlAwareSerializer<>( aggregatingStateDescriptor.getSerializer())); + ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig()); + return ttlAwareStateDesc; } @SuppressWarnings({"unchecked", "rawtypes"}) private StateDescriptor wrapReducingDescriptor() { ReducingStateDescriptor reducingStateDesc = (ReducingStateDescriptor) stateDesc; - return reducingStateDesc.getSerializer() instanceof TtlAwareSerializer - ? reducingStateDesc - : (StateDescriptor) + if (reducingStateDesc.getSerializer() instanceof TtlAwareSerializer) { + return reducingStateDesc; + } + + StateDescriptor ttlAwareStateDesc = + (StateDescriptor) new ReducingStateDescriptor<>( reducingStateDesc.getName(), reducingStateDesc.getReduceFunction(), new TtlAwareSerializer<>(reducingStateDesc.getSerializer())); + ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig()); + return ttlAwareStateDesc; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index bffb7f20847da..415d21dbf30ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -1219,8 +1219,8 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(IllegalStateException.class), - e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); + e -> assertThat(e).isInstanceOf(StateMigrationException.class), + e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } @TestTemplate diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 1bfc9d4237efd..c0128edee87c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -74,6 +74,7 @@ import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; @@ -1160,7 +1161,9 @@ void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception // have identical mappings InternalKvState internalKvState = (InternalKvState) state; KryoSerializer kryoSerializer = - (KryoSerializer) internalKvState.getValueSerializer(); + (KryoSerializer) + ((TtlAwareSerializer) internalKvState.getValueSerializer()) + .getOriginalTypeSerializer(); int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId(); int nestedPojoClassARegistrationId = @@ -4848,7 +4851,8 @@ void testAsyncSnapshot() throws Exception { InternalValueState valueState = backend.createOrUpdateInternalState( VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + new ValueStateDescriptor<>( + "test", new TtlAwareSerializer<>(IntSerializer.INSTANCE))); valueState.setCurrentNamespace(VoidNamespace.INSTANCE); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java index d5aedb4885155..d3d81768cb0ff 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendMigrationTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; @@ -41,6 +40,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -100,7 +100,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw // restore with a V2 serializer that has a different schema new TestType.V2TestTypeSerializer()); newAccessDescriptorAfterRestore.enableTimeToLive( - StateTtlConfig.newBuilder(Time.days(1)).build()); + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); ListStateDescriptor initialAccessListDescriptor = new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); @@ -110,7 +110,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw // restore with a V2 serializer that has a different schema new TestType.V2TestTypeSerializer()); newAccessListDescriptorAfterRestore.enableTimeToLive( - StateTtlConfig.newBuilder(Time.days(1)).build()); + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); MapStateDescriptor initialAccessMapDescriptor = new MapStateDescriptor<>( @@ -122,7 +122,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw // restore with a V2 serializer that has a different schema new TestType.V2TestTypeSerializer()); newAccessMapDescriptorAfterRestore.enableTimeToLive( - StateTtlConfig.newBuilder(Time.days(1)).build()); + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); @@ -135,7 +135,8 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); - initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build()); + initialAccessDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); ValueStateDescriptor newAccessDescriptorAfterRestore = new ValueStateDescriptor<>( stateName, @@ -145,7 +146,7 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw ListStateDescriptor initialAccessListDescriptor = new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); initialAccessListDescriptor.enableTimeToLive( - StateTtlConfig.newBuilder(Time.days(1)).build()); + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); ListStateDescriptor newAccessListDescriptorAfterRestore = new ListStateDescriptor<>( stateName, @@ -156,7 +157,7 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw new MapStateDescriptor<>( stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); initialAccessMapDescriptor.enableTimeToLive( - StateTtlConfig.newBuilder(Time.days(1)).build()); + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); MapStateDescriptor newAccessMapDescriptorAfterRestore = new MapStateDescriptor<>( stateName,