Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Oct 7, 2024
1 parent 616dfbb commit f96c49c
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityRe
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}

if (nestedSerializersCompatibilityResult.isCompatibleAfterTtlMigration()) {
return TypeSerializerSchemaCompatibility.compatibleAfterTtlMigration();
}

if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
TypeSerializer<T> reconfiguredCompositeSerializer =
createOuterSerializerWithNestedSerializers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompat
new TypeSerializer[newNestedSerializerSnapshots.length];

// check nested serializers for compatibility
boolean nestedSerializerRequiresTtlMigration = false;
boolean nestedSerializerRequiresMigration = false;
boolean hasReconfiguredNestedSerializers = false;
for (int i = 0; i < oldNestedSerializerSnapshots.length; i++) {
Expand All @@ -110,7 +111,9 @@ public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompat
return IntermediateCompatibilityResult.definedIncompatibleResult();
}

if (compatibility.isCompatibleAfterMigration()) {
if (compatibility.isCompatibleAfterTtlMigration()) {
nestedSerializerRequiresTtlMigration = true;
} else if (compatibility.isCompatibleAfterMigration()) {
nestedSerializerRequiresMigration = true;
} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
hasReconfiguredNestedSerializers = true;
Expand All @@ -122,6 +125,10 @@ public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompat
}
}

if (nestedSerializerRequiresTtlMigration) {
return IntermediateCompatibilityResult.definedCompatibleAfterTtlMigrationResult();
}

if (nestedSerializerRequiresMigration) {
return IntermediateCompatibilityResult.definedCompatibleAfterMigrationResult();
}
Expand Down Expand Up @@ -150,6 +157,11 @@ static <T> IntermediateCompatibilityResult<T> definedIncompatibleResult() {
TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE, null);
}

static <T> IntermediateCompatibilityResult<T> definedCompatibleAfterTtlMigrationResult() {
return new IntermediateCompatibilityResult<>(
TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_TTL_MIGRATION, null);
}

static <T> IntermediateCompatibilityResult<T> definedCompatibleAfterMigrationResult() {
return new IntermediateCompatibilityResult<>(
TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION, null);
Expand Down Expand Up @@ -184,6 +196,11 @@ public boolean isCompatibleAfterMigration() {
== TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION;
}

public boolean isCompatibleAfterTtlMigration() {
return compatibilityType
== TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_TTL_MIGRATION;
}

public boolean isIncompatible() {
return compatibilityType == TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
.resolveSchemaCompatibility(
previousStateSerializer.snapshotConfiguration());

if (stateCompatibility.isIncompatible()) {
if (stateCompatibility.isIncompatible()
|| stateCompatibility.isCompatibleAfterTtlMigration()) {
throw new StateMigrationException(
"For heap backends, the new state serializer ("
+ newStateSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,60 +70,86 @@ public StateDescriptor<S, SV> getWrappedTtlAwareStateDescriptor() {

@SuppressWarnings("unchecked")
private StateDescriptor<S, SV> wrapValueDescriptor() {
return stateDesc.getSerializer() instanceof TtlAwareSerializer
? stateDesc
: (StateDescriptor<S, SV>)
if (stateDesc.getSerializer() instanceof TtlAwareSerializer) {
return stateDesc;
}

StateDescriptor<S, SV> ttlAwareStateDesc =
(StateDescriptor<S, SV>)
new ValueStateDescriptor<>(
stateDesc.getName(),
new TtlAwareSerializer<>(stateDesc.getSerializer()));
new TtlAwareSerializer<>(stateDesc.getSerializer()),
stateDesc.getDefaultValue());
ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig());
return ttlAwareStateDesc;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private StateDescriptor<S, SV> wrapListDescriptor() {
ListStateDescriptor listStateDesc = (ListStateDescriptor) stateDesc;
return listStateDesc.getElementSerializer() instanceof TtlAwareSerializer
? listStateDesc
: (StateDescriptor<S, SV>)
if (listStateDesc.getElementSerializer() instanceof TtlAwareSerializer) {
return listStateDesc;
}

StateDescriptor<S, SV> ttlAwareStateDesc =
(StateDescriptor<S, SV>)
new ListStateDescriptor<>(
listStateDesc.getName(),
new TtlAwareSerializer<>(listStateDesc.getElementSerializer()));
ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig());
return ttlAwareStateDesc;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private StateDescriptor<S, SV> wrapMapDescriptor() {
MapStateDescriptor mapStateDescriptor = (MapStateDescriptor) stateDesc;
return mapStateDescriptor.getValueSerializer() instanceof TtlAwareSerializer
? mapStateDescriptor
: (StateDescriptor<S, SV>)
if (mapStateDescriptor.getValueSerializer() instanceof TtlAwareSerializer) {
return mapStateDescriptor;
}

StateDescriptor<S, SV> ttlAwareStateDesc =
(StateDescriptor<S, SV>)
new MapStateDescriptor<>(
mapStateDescriptor.getName(),
mapStateDescriptor.getKeySerializer(),
new TtlAwareSerializer<>(mapStateDescriptor.getValueSerializer()));
ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig());
return ttlAwareStateDesc;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private StateDescriptor<S, SV> wrapAggregatingDescriptor() {
AggregatingStateDescriptor aggregatingStateDescriptor =
(AggregatingStateDescriptor) stateDesc;
return aggregatingStateDescriptor.getSerializer() instanceof TtlAwareSerializer
? aggregatingStateDescriptor
: (StateDescriptor<S, SV>)
if (aggregatingStateDescriptor.getSerializer() instanceof TtlAwareSerializer) {
return aggregatingStateDescriptor;
}

StateDescriptor<S, SV> ttlAwareStateDesc =
(StateDescriptor<S, SV>)
new AggregatingStateDescriptor<>(
aggregatingStateDescriptor.getName(),
aggregatingStateDescriptor.getAggregateFunction(),
new TtlAwareSerializer<>(
aggregatingStateDescriptor.getSerializer()));
ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig());
return ttlAwareStateDesc;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private StateDescriptor<S, SV> wrapReducingDescriptor() {
ReducingStateDescriptor reducingStateDesc = (ReducingStateDescriptor) stateDesc;
return reducingStateDesc.getSerializer() instanceof TtlAwareSerializer
? reducingStateDesc
: (StateDescriptor<S, SV>)
if (reducingStateDesc.getSerializer() instanceof TtlAwareSerializer) {
return reducingStateDesc;
}

StateDescriptor<S, SV> ttlAwareStateDesc =
(StateDescriptor<S, SV>)
new ReducingStateDescriptor<>(
reducingStateDesc.getName(),
reducingStateDesc.getReduceFunction(),
new TtlAwareSerializer<>(reducingStateDesc.getSerializer()));
ttlAwareStateDesc.enableTimeToLive(stateDesc.getTtlConfig());
return ttlAwareStateDesc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1219,8 +1219,8 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw
testKeyedValueStateUpgrade(
initialAccessDescriptor, newAccessDescriptorAfterRestore))
.satisfiesAnyOf(
e -> assertThat(e).isInstanceOf(IllegalStateException.class),
e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
e -> assertThat(e).isInstanceOf(StateMigrationException.class),
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
Expand Down Expand Up @@ -1160,7 +1161,9 @@ void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception
// have identical mappings
InternalKvState internalKvState = (InternalKvState) state;
KryoSerializer<TestPojo> kryoSerializer =
(KryoSerializer<TestPojo>) internalKvState.getValueSerializer();
(KryoSerializer<TestPojo>)
((TtlAwareSerializer<TestPojo>) internalKvState.getValueSerializer())
.getOriginalTypeSerializer();
int mainPojoClassRegistrationId =
kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
int nestedPojoClassARegistrationId =
Expand Down Expand Up @@ -4848,7 +4851,8 @@ void testAsyncSnapshot() throws Exception {
InternalValueState<Integer, VoidNamespace, Integer> valueState =
backend.createOrUpdateInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
new ValueStateDescriptor<>(
"test", new TtlAwareSerializer<>(IntSerializer.INSTANCE)));

valueState.setCurrentNamespace(VoidNamespace.INSTANCE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackendMigrationTestBase;
Expand All @@ -41,6 +40,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -100,7 +100,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());

ListStateDescriptor<TestType> initialAccessListDescriptor =
new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());
Expand All @@ -110,7 +110,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessListDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());

MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
new MapStateDescriptor<>(
Expand All @@ -122,7 +122,7 @@ protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throw
// restore with a V2 serializer that has a different schema
new TestType.V2TestTypeSerializer());
newAccessMapDescriptorAfterRestore.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());

testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore);
testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore);
Expand All @@ -135,7 +135,8 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw

ValueStateDescriptor<TestType> initialAccessDescriptor =
new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());
initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build());
initialAccessDescriptor.enableTimeToLive(
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
new ValueStateDescriptor<>(
stateName,
Expand All @@ -145,7 +146,7 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw
ListStateDescriptor<TestType> initialAccessListDescriptor =
new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());
initialAccessListDescriptor.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
new ListStateDescriptor<>(
stateName,
Expand All @@ -156,7 +157,7 @@ protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throw
new MapStateDescriptor<>(
stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer());
initialAccessMapDescriptor.enableTimeToLive(
StateTtlConfig.newBuilder(Time.days(1)).build());
StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore =
new MapStateDescriptor<>(
stateName,
Expand Down

0 comments on commit f96c49c

Please sign in to comment.