From a53a1f3e99fce94b94eba68260f395a4599000ec Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 3 May 2021 09:40:12 +0200 Subject: [PATCH 1/5] [FLINK-22494][kubernetes] Introduces PossibleInconsistentStateException We experienced cases where the ConfigMap was updated but the corresponding HTTP request failed due to connectivity issues. PossibleInconsistentStateException is used to reflect cases where it's not clear whether the data was actually written or not. --- .../kubeclient/Fabric8FlinkKubeClient.java | 11 +++-- .../kubeclient/FlinkKubeClient.java | 8 ++- .../kubernetes/KubernetesClientTestBase.java | 35 +++++++++++-- .../kubernetes/MixedKubernetesServer.java | 6 ++- .../Fabric8FlinkKubeClientTest.java | 49 ++++++++++++++++++- .../PossibleInconsistentStateException.java | 38 ++++++++++++++ 6 files changed, 137 insertions(+), 10 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 058945e882fc0..2b969eb4f4329 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; @@ -293,13 +294,17 @@ public CompletableFuture checkAndUpdateConfigMap( Throwable throwable) { LOG.debug( - "Failed to update ConfigMap {} with data {} because of concurrent " - + "modifications. Trying again.", + "Failed to update ConfigMap {} with data {}. Trying again.", configMap .getName(), configMap .getData()); - throw throwable; + // the + // client + // implementation does not expose the different kind of error causes to a degree that we could do a more fine-grained error handling here + throw new CompletionException( + new PossibleInconsistentStateException( + throwable)); } return true; }) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java index 6897ac1c4ece8..0606ef312ac70 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import java.util.List; import java.util.Map; @@ -153,7 +154,12 @@ KubernetesLeaderElector createLeaderElector( * one. If the returned optional is empty, we will not do the update. * @return Return the ConfigMap update future. The boolean result indicates whether the * ConfigMap is updated. The returned future will be completed exceptionally if the - * ConfigMap does not exist. + * ConfigMap does not exist. A failure during the update operation will result in the future + * failing with a {@link PossibleInconsistentStateException} indicating that no clear + * decision can be made on whether the update was successful or not. The {@code + * PossibleInconsistentStateException} not being present indicates that the failure happened + * before writing the updated ConfigMap to Kubernetes. For the latter case, it can be + * assumed that the ConfigMap was not updated. */ CompletableFuture checkAndUpdateConfigMap( String configMapName, diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java index b00c56d0b1d60..f66d62ca70a4a 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java @@ -31,10 +31,16 @@ import io.fabric8.kubernetes.api.model.ServicePortBuilder; import io.fabric8.kubernetes.api.model.ServiceStatus; import io.fabric8.kubernetes.api.model.ServiceStatusBuilder; +import io.fabric8.mockwebserver.dsl.DelayPathable; +import io.fabric8.mockwebserver.dsl.HttpMethodable; +import io.fabric8.mockwebserver.dsl.MockServerExpectation; +import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable; +import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable; import javax.annotation.Nullable; import java.util.Collections; +import java.util.function.Function; /** * Base class for {@link KubernetesClusterDescriptorTest} and {@link @@ -53,14 +59,35 @@ protected void mockExpectedServiceFromServerSide(Service expectedService) { } protected void mockCreateConfigMapAlreadyExisting(ConfigMap configMap) { - final String path = String.format("/api/v1/namespaces/%s/configmaps", NAMESPACE); + final String path = + String.format( + "/api/%s/namespaces/%s/configmaps", + configMap.getApiVersion(), configMap.getMetadata().getNamespace()); server.expect().post().withPath(path).andReturn(500, configMap).always(); } + protected void mockGetConfigMapFailed(ConfigMap configMap) { + mockConfigMapRequest(configMap, HttpMethodable::get); + } + protected void mockReplaceConfigMapFailed(ConfigMap configMap) { - final String name = configMap.getMetadata().getName(); - final String path = String.format("/api/v1/namespaces/%s/configmaps/%s", NAMESPACE, name); - server.expect().put().withPath(path).andReturn(500, configMap).always(); + mockConfigMapRequest(configMap, HttpMethodable::put); + } + + private void mockConfigMapRequest( + ConfigMap configMap, + Function< + MockServerExpectation, + DelayPathable< + ReturnOrWebsocketable>>> + methodTypeSetter) { + final String path = + String.format( + "/api/%s/namespaces/%s/configmaps/%s", + configMap.getApiVersion(), + configMap.getMetadata().getNamespace(), + configMap.getMetadata().getName()); + methodTypeSetter.apply(server.expect()).withPath(path).andReturn(500, configMap).always(); } protected Service buildExternalServiceWithLoadBalancer( diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java index acdf697c04e8d..c574563261003 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java @@ -50,7 +50,7 @@ public MixedKubernetesServer(boolean https, boolean crudMode) { } public void before() { - HashMap> response = new HashMap<>(); + final HashMap> response = new HashMap<>(); mock = crudMode ? new KubernetesMockServer( @@ -77,6 +77,10 @@ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception return mockWebServer.takeRequest(timeout, unit); } + public int getRequestCount() { + return mockWebServer.getRequestCount(); + } + public MockServerExpectation expect() { return mock.expect(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index 9ed3a4e74036f..e81c01c5a3f35 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -34,8 +34,10 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; @@ -432,6 +434,44 @@ public void testCheckAndUpdateConfigMapWhenConfigMapNotExist() { } } + @Test + public void testCheckAndUpdateConfigMapWhenGetConfigMapFailed() throws Exception { + final int configuredRetries = + flinkConfig.getInteger( + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES); + final KubernetesConfigMap configMap = buildTestingConfigMap(); + this.flinkKubeClient.createConfigMap(configMap).get(); + + mockGetConfigMapFailed(configMap.getInternalResource()); + + final int initialRequestCount = server.getRequestCount(); + try { + this.flinkKubeClient + .checkAndUpdateConfigMap( + TESTING_CONFIG_MAP_NAME, + c -> { + throw new AssertionError( + "The replace operation should have never been triggered."); + }) + .get(); + fail( + "checkAndUpdateConfigMap should fail without a PossibleInconsistentStateException being the cause when number of retries has been exhausted."); + } catch (Exception ex) { + assertThat( + ex, + FlinkMatchers.containsMessage( + "Could not complete the " + + "operation. Number of retries has been exhausted.")); + final int actualRetryCount = server.getRequestCount() - initialRequestCount; + assertThat(actualRetryCount, is(configuredRetries + 1)); + assertThat( + "An error while retrieving the ConfigMap should not cause a PossibleInconsistentStateException.", + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class) + .isPresent(), + is(false)); + } + } + @Test public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Exception { final int configuredRetries = @@ -453,7 +493,7 @@ public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep }) .get(); fail( - "CheckAndUpdateConfigMap should fail with exception when number of retries has been exhausted."); + "checkAndUpdateConfigMap should fail due to a PossibleInconsistentStateException when number of retries has been exhausted."); } catch (Exception ex) { assertThat( ex, @@ -461,6 +501,12 @@ public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep "Could not complete the " + "operation. Number of retries has been exhausted.")); assertThat(retries.get(), is(configuredRetries + 1)); + + assertThat( + "An error while replacing the ConfigMap should cause an PossibleInconsistentStateException.", + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class) + .isPresent(), + is(true)); } } @@ -501,6 +547,7 @@ private KubernetesConfigMap buildTestingConfigMap() { .withNewMetadata() .withName(TESTING_CONFIG_MAP_NAME) .withLabels(TESTING_LABELS) + .withNamespace(NAMESPACE) .endMetadata() .withData(data) .build()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java new file mode 100644 index 0000000000000..fccfde8508e74 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.persistence; + +import org.apache.flink.util.FlinkException; + +/** + * {@code PossibleInconsistentStateException} represents errors that might have lead to an + * inconsistent state within the HA resources. + */ +public class PossibleInconsistentStateException extends FlinkException { + + private static final long serialVersionUID = 364105635349022882L; + + public PossibleInconsistentStateException(String message, Throwable cause) { + super(message, cause); + } + + public PossibleInconsistentStateException(Throwable cause) { + super(cause); + } +} From 95bd043f0a00b1ed56a90cf077b7cca0f5685333 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 4 May 2021 13:08:07 +0200 Subject: [PATCH 2/5] [FLINK-22494][ha] Refactors TestingLongStateHandleHelper to operate on references The previous implementation stored the state in the StateHandle. This causes problems when deserializing the state creating a new instance that does not point to the actual state but is a copy of this state. This refactoring introduces LongStateHandle handling the actual state and LongRetrievableStateHandle referencing this handle. --- .../KubernetesStateHandleStoreITCase.java | 10 +- .../KubernetesStateHandleStoreTest.java | 373 ++++++++++-------- .../TestingLongStateHandleHelper.java | 104 +++-- .../ZooKeeperStateHandleStoreTest.java | 202 +++++----- 4 files changed, 397 insertions(+), 292 deletions(-) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java index 009dc9815bcb1..75da66c890514 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java @@ -68,8 +68,8 @@ public void testMultipleKubernetesStateHandleStores() throws Exception { new TestingLeaderCallbackHandler[leaderNum]; @SuppressWarnings("unchecked") - final KubernetesStateHandleStore[] stateHandleStores = - new KubernetesStateHandleStore[leaderNum]; + final KubernetesStateHandleStore[] + stateHandleStores = new KubernetesStateHandleStore[leaderNum]; try { for (int i = 0; i < leaderNum; i++) { @@ -103,17 +103,19 @@ public void testMultipleKubernetesStateHandleStores() throws Exception { if (leaderCallbackHandlers[i].getLockIdentity().equals(lockIdentity)) { expectedState = (long) i; } - stateHandleStores[i].addAndLock(KEY, (long) i); + stateHandleStores[i].addAndLock( + KEY, new TestingLongStateHandleHelper.LongStateHandle(i)); } // Only the leader could add successfully assertThat(expectedState, is(notNullValue())); assertThat(stateHandleStores[0].getAllAndLock().size(), is(1)); assertThat( - stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState(), + stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState().getValue(), is(expectedState)); assertThat(stateHandleStores[0].getAllAndLock().get(0).f1, is(KEY)); } finally { + TestingLongStateHandleHelper.clearGlobalState(); // Cleanup the resources for (int i = 0; i < leaderNum; i++) { if (leaderElectors[i] != null) { diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java index 339291d8e793f..6e230f5250fab 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; -import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionUtils; @@ -36,6 +35,7 @@ import java.util.Comparator; import java.util.List; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; @@ -49,14 +49,16 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe private static final String PREFIX = "test-prefix-"; private final String key = PREFIX + JobID.generate(); private final Predicate filter = k -> k.startsWith(PREFIX); - private final Long state = 12345L; + private final TestingLongStateHandleHelper.LongStateHandle state = + new TestingLongStateHandleHelper.LongStateHandle(12345L); - private TestingLongStateHandleHelper longStateStorage; + private final TestingLongStateHandleHelper longStateStorage = + new TestingLongStateHandleHelper(); @Before public void setup() { super.setup(); - longStateStorage = new TestingLongStateHandleHelper(); + TestingLongStateHandleHelper.clearGlobalState(); } @Test @@ -67,13 +69,15 @@ public void testAdd() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat(store.getAllAndLock().size(), is(1)); assertThat(store.getAndLock(key).retrieveState(), is(state)); @@ -92,13 +96,15 @@ public void testAddAlreadyExistingKey() throws Exception { getLeaderConfigMap().getData().put(key, "existing data"); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); try { store.addAndLock(key, state); @@ -110,12 +116,10 @@ public void testAddAlreadyExistingKey() throws Exception { key, LEADER_CONFIGMAP_NAME); assertThat(ex, FlinkMatchers.containsMessage(msg)); } - assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(1)); }); } @@ -128,13 +132,15 @@ public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception { runTest( () -> { - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); try { store.addAndLock(key, state); @@ -146,12 +152,10 @@ public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception LEADER_CONFIGMAP_NAME); assertThat(ex, FlinkMatchers.containsMessage(msg)); } - assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(1)); }); } @@ -166,17 +170,20 @@ public void testReplace() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - final Long newState = 23456L; + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); final StringResourceVersion resourceVersion = store.exists(key); store.replace(key, resourceVersion, newState); @@ -195,14 +202,17 @@ public void testReplaceWithKeyNotExist() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); try { assertThat( @@ -230,14 +240,17 @@ public void testReplaceWithNoLeadershipAndDiscardState() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); store.addAndLock(key, state); // Lost leadership @@ -253,18 +266,14 @@ public void testReplaceWithNoLeadershipAndDiscardState() throws Exception { // The state do not change assertThat(store.getAndLock(key).retrieveState(), is(state)); - assertThat(longStateStorage.getStateHandles().size(), is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(0)); assertThat( - longStateStorage - .getStateHandles() - .get(1) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), is(1)); }); } @@ -280,13 +289,15 @@ public void testReplaceFailedAndDiscardState() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); final FlinkKubeClient anotherFlinkKubeClient = @@ -296,15 +307,18 @@ public void testReplaceFailedAndDiscardState() throws Exception { throw updateException; }) .build(); - final KubernetesStateHandleStore anotherStore = - new KubernetesStateHandleStore<>( - anotherFlinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + anotherStore = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); final StringResourceVersion resourceVersion = anotherStore.exists(key); assertThat(resourceVersion.isExisting(), is(true)); try { @@ -318,18 +332,14 @@ public void testReplaceFailedAndDiscardState() throws Exception { // The state do not change assertThat(anotherStore.getAndLock(key).retrieveState(), is(state)); - assertThat(longStateStorage.getStateHandles().size(), is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(0)); assertThat( - longStateStorage - .getStateHandles() - .get(1) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), is(1)); }); } @@ -344,13 +354,15 @@ public void testGetAndExist() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat( store.exists(key), @@ -369,13 +381,15 @@ public void testGetNonExistingKey() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final String nonExistingKey = "non-existing-key"; store.addAndLock(key, state); assertThat( @@ -404,26 +418,37 @@ public void testGetAll() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final List expected = Arrays.asList(3L, 2L, 1L); for (Long each : expected) { - store.addAndLock(key + each, each); + store.addAndLock( + key + each, + new TestingLongStateHandleHelper.LongStateHandle(each)); } - final Long[] actual = + final TestingLongStateHandleHelper.LongStateHandle[] actual = store.getAllAndLock().stream() .map( FunctionUtils.uncheckedFunction( e -> e.f0.retrieveState())) - .toArray(Long[]::new); + .toArray( + TestingLongStateHandleHelper.LongStateHandle[] + ::new); assertThat( - Arrays.asList(actual), containsInAnyOrder(expected.toArray())); + Arrays.stream(actual) + .map( + TestingLongStateHandleHelper.LongStateHandle + ::getValue) + .collect(Collectors.toList()), + containsInAnyOrder(expected.toArray())); }); } }; @@ -437,13 +462,15 @@ public void testGetAllHandles() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final List expected = Arrays.asList(key + 3, key + 2, key + 1); for (String each : expected) { @@ -465,24 +492,23 @@ public void testRemove() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat(store.getAllAndLock().size(), is(1)); assertThat(store.releaseAndTryRemove(key), is(true)); assertThat(store.getAllAndLock().size(), is(0)); // State should also be discarded. - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1)); }); } }; @@ -495,15 +521,16 @@ public void testRemoveFailedShouldNotDiscardState() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); // Lost leadership @@ -516,9 +543,7 @@ public void testRemoveFailedShouldNotDiscardState() throws Exception { assertThat(store.releaseAndTryRemove(key), is(false)); assertThat(store.getAllAndLock().size(), is(1)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(0)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); }); } }; @@ -531,23 +556,24 @@ public void testRemoveAllHandlesAndDiscardState() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - store.addAndLock(key + "1", 2L); + store.addAndLock( + key + "1", + new TestingLongStateHandleHelper.LongStateHandle(2L)); assertThat(store.getAllAndLock().size(), is(2)); store.releaseAndTryRemoveAll(); assertThat(store.getAllAndLock().size(), is(0)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(2)); }); } }; @@ -560,26 +586,27 @@ public void testRemoveAllHandles() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); final String anotherKey = "key-not-with-prefix"; getLeaderConfigMap().getData().put(anotherKey, "value"); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - store.addAndLock(key + "1", 2L); + store.addAndLock( + key + "1", + new TestingLongStateHandleHelper.LongStateHandle(2L)); assertThat(store.getAllAndLock().size(), is(2)); store.clearEntries(); assertThat(store.getAllAndLock().size(), is(0)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(0)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); // Should only remove the key with specified prefix. assertThat( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java index 4010391c1e249..fd387b2488127 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java @@ -19,72 +19,122 @@ package org.apache.flink.runtime.persistence; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.AbstractID; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.StringJoiner; /** * Testing implementation for {@link RetrievableStateStorageHelper} and {@link * RetrievableStateHandle} with type {@link Long}. */ -public class TestingLongStateHandleHelper implements RetrievableStateStorageHelper { +public class TestingLongStateHandleHelper + implements RetrievableStateStorageHelper { - private final List stateHandles = new ArrayList<>(); + private static final List STATE_STORAGE = new ArrayList<>(); @Override - public RetrievableStateHandle store(Long state) { - final LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state); - stateHandles.add(stateHandle); + public RetrievableStateHandle store(LongStateHandle state) { + final int pos = STATE_STORAGE.size(); + STATE_STORAGE.add(state); - return stateHandle; + return new LongRetrievableStateHandle(pos); } - public List getStateHandles() { - return stateHandles; + public static LongStateHandle createState(long value) { + return new LongStateHandle(value); } - /** Testing {@link RetrievableStateStorageHelper} implementation with {@link Long}. */ - public static class LongRetrievableStateHandle implements RetrievableStateHandle { + public static long getStateHandleValueByIndex(int index) { + return STATE_STORAGE.get(index).getValue(); + } - private static final long serialVersionUID = -3555329254423838912L; + public static int getDiscardCallCountForStateHandleByIndex(int index) { + return STATE_STORAGE.get(index).getNumberOfDiscardCalls(); + } - private static AtomicInteger numberOfGlobalDiscardCalls = new AtomicInteger(0); + public static int getGlobalStorageSize() { + return STATE_STORAGE.size(); + } + + public static void clearGlobalState() { + STATE_STORAGE.clear(); + } + + public static int getGlobalDiscardCount() { + return STATE_STORAGE.stream().mapToInt(LongStateHandle::getNumberOfDiscardCalls).sum(); + } - private final Long state; + /** + * {@code LongStateHandle} implements {@link StateObject} to monitor the {@link + * StateObject#discardState()} calls. + */ + public static class LongStateHandle implements StateObject { + + private static final long serialVersionUID = -5752042587113549855L; + + private final Long value; private int numberOfDiscardCalls = 0; - public LongRetrievableStateHandle(Long state) { - this.state = state; + public LongStateHandle(long value) { + this.value = value; } - @Override - public Long retrieveState() { - return state; + public long getValue() { + return value; } @Override public void discardState() { - numberOfGlobalDiscardCalls.incrementAndGet(); numberOfDiscardCalls++; } + public int getNumberOfDiscardCalls() { + return numberOfDiscardCalls; + } + @Override public long getStateSize() { - return 0; + return 8L; } - public int getNumberOfDiscardCalls() { - return numberOfDiscardCalls; + @Override + public String toString() { + return new StringJoiner(", ", LongStateHandle.class.getSimpleName() + "[", "]") + .add("value=" + value) + .add("numberOfDiscardCalls=" + numberOfDiscardCalls) + .toString(); } + } + + /** Testing {@link RetrievableStateStorageHelper} implementation with {@link Long}. */ + public static class LongRetrievableStateHandle + implements RetrievableStateHandle { - public static int getNumberOfGlobalDiscardCalls() { - return numberOfGlobalDiscardCalls.get(); + private static final long serialVersionUID = -3555329254423838912L; + + private final int stateReference; + + public LongRetrievableStateHandle(int stateReference) { + this.stateReference = stateReference; } - public static void clearNumberOfGlobalDiscardCalls() { - numberOfGlobalDiscardCalls.set(0); + @Override + public LongStateHandle retrieveState() { + return STATE_STORAGE.get(stateReference); + } + + @Override + public void discardState() { + STATE_STORAGE.get(stateReference).discardState(); + } + + @Override + public long getStateSize() { + return AbstractID.SIZE; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index c89e9d5acb3fd..e9f0bd249ae09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.persistence.IntegerResourceVersion; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; -import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.InstantiationUtil; @@ -82,26 +81,27 @@ public static void tearDown() throws Exception { @Before public void cleanUp() throws Exception { ZOOKEEPER.deleteAll(); + TestingLongStateHandleHelper.clearGlobalState(); } /** Tests add operation with lock. */ @Test public void testAddAndLock() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); // Config final String pathInZooKeeper = "/testAdd"; - final Long state = 1239712317L; + final long state = 1239712317L; // Test - store.addAndLock(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); // Verify // State handle created assertEquals(1, store.getAllAndLock().size()); - assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState()); + assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState().getValue()); // Path created and is persistent Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); @@ -121,12 +121,13 @@ public void testAddAndLock() throws Exception { // Data is equal @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(state, actual); } @@ -136,23 +137,25 @@ public void testAddAndLock() throws Exception { public void testAddAlreadyExistingPath() throws Exception { final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath"); - store.addAndLock("/testAddAlreadyExistingPath", 1L); + store.addAndLock( + "/testAddAlreadyExistingPath", + new TestingLongStateHandleHelper.LongStateHandle(1L)); // writing to the state storage should have succeeded - assertEquals(1, stateHandleProvider.getStateHandles()); + assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); // the created state handle should have been cleaned up if the add operation failed - assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); + assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); } /** Tests that the created state handle is discarded if ZooKeeper create fails. */ @Test - public void testAddDiscardStateHandleAfterFailure() throws Exception { + public void testAddDiscardStateHandleAfterFailure() { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); @@ -160,25 +163,26 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception { when(client.inTransaction().create()) .thenThrow(new RuntimeException("Expected test Exception.")); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); // Config final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; - final Long state = 81282227L; + final long state = 81282227L; try { // Test - store.addAndLock(pathInZooKeeper, state); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); fail("Did not throw expected exception"); } catch (Exception ignored) { } // Verify // State handle created and discarded - assertEquals(1, stateHandleProvider.getStateHandles().size()); - assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); + assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(state, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); } /** Tests that a state handle is replaced. */ @@ -187,23 +191,27 @@ public void testReplace() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testReplace"; - final Long initialState = 30968470898L; - final Long replaceState = 88383776661L; + final long initialState = 30968470898L; + final long replaceState = 88383776661L; // Test - store.addAndLock(pathInZooKeeper, initialState); - store.replace(pathInZooKeeper, IntegerResourceVersion.valueOf(0), replaceState); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); // Verify // State handles created - assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); // Path created and is persistent Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); @@ -212,12 +220,13 @@ public void testReplace() throws Exception { // Data is equal @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(replaceState, actual); } @@ -225,12 +234,16 @@ public void testReplace() throws Exception { /** Tests that a non existing path throws an Exception. */ @Test(expected = Exception.class) public void testReplaceNonExistingPath() throws Exception { - final RetrievableStateStorageHelper stateStorage = new TestingLongStateHandleHelper(); + final RetrievableStateStorageHelper + stateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateStorage); - store.replace("/testReplaceNonExistingPath", IntegerResourceVersion.valueOf(0), 1L); + store.replace( + "/testReplaceNonExistingPath", + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(1L)); } /** Tests that the replace state handle is discarded if ZooKeeper setData fails. */ @@ -242,38 +255,43 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { CuratorFramework client = spy(ZOOKEEPER.getClient()); when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); // Config final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure"; - final Long initialState = 30968470898L; - final Long replaceState = 88383776661L; + final long initialState = 30968470898L; + final long replaceState = 88383776661L; // Test - store.addAndLock(pathInZooKeeper, initialState); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); try { - store.replace(pathInZooKeeper, IntegerResourceVersion.valueOf(0), replaceState); + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); fail("Did not throw expected exception"); } catch (Exception ignored) { } // Verify // State handle created and discarded - assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); - assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls()); + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); + assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1)); // Initial value @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(initialState, actual); } @@ -284,21 +302,22 @@ public void testGetAndExists() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testGetAndExists"; - final Long state = 311222268470898L; + final long state = 311222268470898L; // Test assertThat(store.exists(pathInZooKeeper).isExisting(), is(false)); - store.addAndLock(pathInZooKeeper, state); - RetrievableStateHandle actual = store.getAndLock(pathInZooKeeper); + store.addAndLock(pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); + RetrievableStateHandle actual = + store.getAndLock(pathInZooKeeper); // Verify - assertEquals(state, actual.retrieveState()); + assertEquals(state, actual.retrieveState().getValue()); assertTrue(store.exists(pathInZooKeeper).getValue() >= 0); } @@ -307,7 +326,7 @@ public void testGetAndExists() throws Exception { public void testGetNonExistingPath() throws Exception { final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); store.getAndLock("/testGetNonExistingPath"); @@ -319,7 +338,7 @@ public void testGetAll() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -333,11 +352,13 @@ public void testGetAll() throws Exception { // Test for (long val : expected) { - store.addAndLock(pathInZooKeeper + val, val); + store.addAndLock( + pathInZooKeeper + val, new TestingLongStateHandleHelper.LongStateHandle(val)); } - for (Tuple2, String> val : store.getAllAndLock()) { - assertTrue(expected.remove(val.f0.retrieveState())); + for (Tuple2, String> + val : store.getAllAndLock()) { + assertTrue(expected.remove(val.f0.retrieveState().getValue())); } assertEquals(0, expected.size()); } @@ -348,7 +369,7 @@ public void testGetAllSortedByName() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -359,10 +380,12 @@ public void testGetAllSortedByName() throws Exception { // Test for (long val : expected) { final String pathInZooKeeper = String.format("%s%016d", basePath, val); - store.addAndLock(pathInZooKeeper, val); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(val)); } - List, String>> actual = store.getAllAndLock(); + List, String>> + actual = store.getAllAndLock(); assertEquals(expected.length, actual.size()); // bring the elements in sort order @@ -370,7 +393,7 @@ public void testGetAllSortedByName() throws Exception { Collections.sort(actual, Comparator.comparing(o -> o.f1)); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], actual.get(i).f0.retrieveState()); + assertEquals(expected[i], (Long) actual.get(i).f0.retrieveState().getValue()); } } @@ -380,17 +403,16 @@ public void testRemove() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testRemove"; - final Long state = 27255442L; - store.addAndLock(pathInZooKeeper, state); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(27255442L)); - final int numberOfGlobalDiscardCalls = - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(); + final int numberOfGlobalDiscardCalls = TestingLongStateHandleHelper.getGlobalDiscardCount(); // Test store.releaseAndTryRemove(pathInZooKeeper); @@ -399,7 +421,7 @@ public void testRemove() throws Exception { assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size()); assertEquals( numberOfGlobalDiscardCalls + 1, - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls()); + TestingLongStateHandleHelper.getGlobalDiscardCount()); } /** Tests that all state handles are correctly discarded. */ @@ -408,7 +430,7 @@ public void testReleaseAndTryRemoveAll() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -422,7 +444,8 @@ public void testReleaseAndTryRemoveAll() throws Exception { // Test for (long val : expected) { - store.addAndLock(pathInZooKeeper + val, val); + store.addAndLock( + pathInZooKeeper + val, new TestingLongStateHandleHelper.LongStateHandle(val)); } store.releaseAndTryRemoveAll(); @@ -439,7 +462,7 @@ public void testReleaseAndTryRemoveAll() throws Exception { public void testCorruptedData() throws Exception { final TestingLongStateHandleHelper stateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateStorage); final Collection input = new HashSet<>(); @@ -448,21 +471,23 @@ public void testCorruptedData() throws Exception { input.add(3L); for (Long aLong : input) { - store.addAndLock("/" + aLong, aLong); + store.addAndLock("/" + aLong, new TestingLongStateHandleHelper.LongStateHandle(aLong)); } // corrupt one of the entries ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]); - List, String>> allEntries = store.getAllAndLock(); + List, String>> + allEntries = store.getAllAndLock(); Collection expected = new HashSet<>(input); expected.remove(2L); Collection actual = new HashSet<>(expected.size()); - for (Tuple2, String> entry : allEntries) { - actual.add(entry.f0.retrieveState()); + for (Tuple2, String> + entry : allEntries) { + actual.add(entry.f0.retrieveState().getValue()); } assertEquals(expected, actual); @@ -478,23 +503,24 @@ public void testCorruptedData() throws Exception { public void testConcurrentDeleteOperation() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore1 = + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); - ZooKeeperStateHandleStore zkStore2 = + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String statePath = "/state"; - zkStore1.addAndLock(statePath, 42L); - RetrievableStateHandle stateHandle = zkStore2.getAndLock(statePath); + zkStore1.addAndLock(statePath, new TestingLongStateHandleHelper.LongStateHandle(42L)); + RetrievableStateHandle stateHandle = + zkStore2.getAndLock(statePath); // this should not remove the referenced node because we are still holding a state handle // reference via zkStore2 zkStore1.releaseAndTryRemove(statePath); // sanity check - assertEquals(42L, (long) stateHandle.retrieveState()); + assertEquals(42L, stateHandle.retrieveState().getValue()); Stat nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath); @@ -521,15 +547,15 @@ public void testConcurrentDeleteOperation() throws Exception { public void testLockCleanupWhenGetAndLockFails() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore1 = + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); - ZooKeeperStateHandleStore zkStore2 = + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String path = "/state"; - zkStore1.addAndLock(path, 42L); + zkStore1.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); final byte[] corruptedData = {1, 2}; @@ -581,12 +607,12 @@ public void testLockCleanupWhenClientTimesOut() throws Exception { try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework(configuration)) { - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(client, longStateStorage); final String path = "/state"; - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); // this should delete all ephemeral nodes client.close(); @@ -612,12 +638,12 @@ public void testLockCleanupWhenClientTimesOut() throws Exception { public void testRelease() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String path = "/state"; - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); final String lockPath = zkStore.getLockPath(path); @@ -648,13 +674,13 @@ public void testRelease() throws Exception { public void testReleaseAll() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final Collection paths = Arrays.asList("/state1", "/state2", "/state3"); for (String path : paths) { - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); } for (String path : paths) { @@ -680,12 +706,12 @@ public void testReleaseAll() throws Exception { @Test public void testRemoveAllHandlesShouldRemoveAllPaths() throws Exception { - final ZooKeeperStateHandleStore zkStore = + final ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( ZooKeeperUtils.useNamespaceAndEnsurePath(ZOOKEEPER.getClient(), "/path"), new TestingLongStateHandleHelper()); - zkStore.addAndLock("/state", 1L); + zkStore.addAndLock("/state", new TestingLongStateHandleHelper.LongStateHandle(1L)); zkStore.clearEntries(); assertThat(zkStore.getAllHandles(), is(empty())); From b81887e6470138c10cd0fc2927afbf7f7c80c91c Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 4 May 2021 09:41:38 +0200 Subject: [PATCH 3/5] [FLINK-22494][ha] Introduces PossibleInconsistentState to StateHandleStore --- .../KubernetesStateHandleStore.java | 74 ++++-- .../KubernetesStateHandleStoreTest.java | 109 +++++++++ .../PossibleInconsistentStateException.java | 6 + .../runtime/persistence/StateHandleStore.java | 13 +- .../runtime/util/StateHandleStoreUtils.java | 76 ++++++ .../zookeeper/ZooKeeperStateHandleStore.java | 150 ++++++++---- .../util/StateHandleStoreUtilsTest.java | 114 +++++++++ .../ZooKeeperStateHandleStoreTest.java | 226 ++++++++++++++++-- 8 files changed, 673 insertions(+), 95 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/StateHandleStoreUtils.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java index d225275e3f674..68caf9f387e8a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java @@ -23,12 +23,12 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +48,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -114,21 +116,28 @@ public KubernetesStateHandleStore( * @param key Key in ConfigMap * @param state State to be added * @throws AlreadyExistException if the name already exists + * @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This + * indicates that it's not clear whether the new state was successfully written to + * Kubernetes or not. No state was discarded. Proper error handling has to be applied on the + * caller's side. * @throws Exception if persisting state or writing state handle failed */ @Override - public RetrievableStateHandle addAndLock(String key, T state) throws Exception { + public RetrievableStateHandle addAndLock(String key, T state) + throws PossibleInconsistentStateException, Exception { checkNotNull(key, "Key in ConfigMap."); checkNotNull(state, "State."); final RetrievableStateHandle storeHandle = storage.store(state); - boolean success = false; + final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); + // initialize flag to serve the failure case + boolean discardState = true; try { - final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); - success = - kubeClient + // a successful operation will result in the state not being discarded + discardState = + !kubeClient .checkAndUpdateConfigMap( configMapName, c -> { @@ -151,14 +160,20 @@ public RetrievableStateHandle addAndLock(String key, T state) throws Exceptio .get(); return storeHandle; } catch (Exception ex) { + final Optional possibleInconsistentStateException = + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class); + if (possibleInconsistentStateException.isPresent()) { + // it's unclear whether the state handle metadata was written to the ConfigMap - + // hence, we don't discard the data + discardState = false; + throw possibleInconsistentStateException.get(); + } + throw ExceptionUtils.findThrowable(ex, AlreadyExistException.class) .orElseThrow(() -> ex); } finally { - if (!success) { - // Cleanup the state handle if it was not written to ConfigMap. - if (storeHandle != null) { - storeHandle.discardState(); - } + if (discardState) { + storeHandle.discardState(); } } } @@ -173,6 +188,9 @@ public RetrievableStateHandle addAndLock(String key, T state) throws Exceptio * @param resourceVersion resource version when checking existence via {@link #exists}. * @param state State to be added * @throws NotExistException if the name does not exist + * @throws PossibleInconsistentStateException if a failure occurred during the update operation. + * It's unclear whether the operation actually succeeded or not. No state was discarded. The + * method's caller should handle this case properly. * @throws Exception if persisting state or writing state handle failed */ @Override @@ -185,11 +203,13 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) final RetrievableStateHandle newStateHandle = storage.store(state); - boolean success = false; + final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + // initialize flags to serve the failure case + boolean discardOldState = false; + boolean discardNewState = true; try { - final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(newStateHandle); - success = + boolean success = kubeClient .checkAndUpdateConfigMap( configMapName, @@ -202,7 +222,7 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) .put( key, encodeStateHandle( - serializedStoreHandle)); + serializedStateHandle)); } else { throw new CompletionException( getKeyNotExistException(key)); @@ -212,14 +232,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) return Optional.empty(); }) .get(); + + // swap subject for deletion in case of success + discardOldState = success; + discardNewState = !success; } catch (Exception ex) { + final Optional possibleInconsistentStateException = + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class); + if (possibleInconsistentStateException.isPresent()) { + // it's unclear whether the state handle metadata was written to the ConfigMap - + // hence, we don't discard any data + discardNewState = false; + throw possibleInconsistentStateException.get(); + } + throw ExceptionUtils.findThrowable(ex, NotExistException.class).orElseThrow(() -> ex); } finally { - if (success) { - oldStateHandle.discardState(); - } else { + if (discardNewState) { newStateHandle.discardState(); } + + if (discardOldState) { + oldStateHandle.discardState(); + } } } @@ -476,8 +511,7 @@ private RetrievableStateHandle deserializeObject(String content) throws IOExc final byte[] data = Base64.getDecoder().decode(content); try { - return InstantiationUtil.deserializeObject( - data, Thread.currentThread().getContextClassLoader()); + return deserialize(data); } catch (IOException | ClassNotFoundException e) { throw new IOException( "Failed to deserialize state handle from ConfigMap data " + content + '.', e); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java index 6e230f5250fab..b1da85b9ec884 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java @@ -22,6 +22,8 @@ import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; @@ -126,6 +128,44 @@ public void testAddAlreadyExistingKey() throws Exception { }; } + @Test + public void testAddWithPossiblyInconsistentStateHandling() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final FlinkKubeClient anotherFlinkKubeClient = + createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction( + (configMapName, function) -> + FutureUtils.completedExceptionally( + new PossibleInconsistentStateException())) + .build(); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + try { + store.addAndLock(key, state); + fail("PossibleInconsistentStateException should have been thrown."); + } catch (PossibleInconsistentStateException ex) { + // PossibleInconsistentStateException is expected + } + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); + }); + } + }; + } + @Test public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception { new Context() { @@ -346,6 +386,75 @@ public void testReplaceFailedAndDiscardState() throws Exception { }; } + @Test + public void testReplaceFailedWithPossiblyInconsistentState() throws Exception { + final PossibleInconsistentStateException updateException = + new PossibleInconsistentStateException(); + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + store.addAndLock(key, state); + + final FlinkKubeClient anotherFlinkKubeClient = + createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction( + (configMapName, function) -> + FutureUtils.completedExceptionally( + updateException)) + .build(); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + anotherStore = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final StringResourceVersion resourceVersion = anotherStore.exists(key); + assertThat(resourceVersion.isExisting(), is(true)); + try { + anotherStore.replace( + key, + resourceVersion, + new TestingLongStateHandleHelper.LongStateHandle(23456L)); + fail( + "An exception having a PossibleInconsistentStateException as its cause should have been thrown."); + } catch (Exception ex) { + assertThat(ex, is(updateException)); + } + assertThat(anotherStore.getAllAndLock().size(), is(1)); + // The state does not change + assertThat(anotherStore.getAndLock(key).retrieveState(), is(state)); + + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); + // no state was discarded + assertThat( + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), + is(0)); + assertThat( + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), + is(0)); + }); + } + }; + } + @Test public void testGetAndExist() throws Exception { new Context() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java index fccfde8508e74..364c012e9f1f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.persistence; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkException; /** @@ -28,6 +29,11 @@ public class PossibleInconsistentStateException extends FlinkException { private static final long serialVersionUID = 364105635349022882L; + @VisibleForTesting + public PossibleInconsistentStateException() { + super("The system might be in an inconsistent state."); + } + public PossibleInconsistentStateException(String message, Throwable cause) { super(message, cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java index cf5d8dc3c512b..dff5d084f9683 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java @@ -50,9 +50,14 @@ public interface StateHandleStore addAndLock(String name, T state) throws Exception; + RetrievableStateHandle addAndLock(String name, T state) + throws PossibleInconsistentStateException, Exception; /** * Replaces a state handle in the distributed coordination system and discards the old state @@ -64,9 +69,13 @@ public interface StateHandleStore The type of data handled by the deserialized {@code RetrievableStateHandle}. + * @return The {@code RetrievableStateHandle} instance. + * @throws IOException Any of the usual Input/Output related exceptions. + * @throws ClassNotFoundException If the data couldn't be deserialized into a {@code + * RetrievableStateHandle} referring to the expected type {@code }. + */ + public static T deserialize(byte[] data) + throws IOException, ClassNotFoundException { + return InstantiationUtil.deserializeObject( + data, Thread.currentThread().getContextClassLoader()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 378d49bb61b7c..a7b4c418698c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.zookeeper; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths; @@ -41,8 +42,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Sets.newHashSet; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -82,6 +87,19 @@ public class ZooKeeperStateHandleStore private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); + @VisibleForTesting + static final Set> PRE_COMMIT_EXCEPTIONS = + newHashSet( + KeeperException.NodeExistsException.class, + KeeperException.BadArgumentsException.class, + KeeperException.NoNodeException.class, + KeeperException.NoAuthException.class, + KeeperException.BadVersionException.class, + KeeperException.AuthFailedException.class, + KeeperException.InvalidACLException.class, + KeeperException.SessionMovedException.class, + KeeperException.NotReadOnlyException.class); + /** Curator ZooKeeper client. */ private final CuratorFramework client; @@ -126,10 +144,14 @@ public ZooKeeperStateHandleStore( * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet) * @param state State to be added * @return The Created {@link RetrievableStateHandle}. + * @throws PossibleInconsistentStateException if the write-to-ZooKeeper operation failed. This + * indicates that it's not clear whether the new state was successfully written to ZooKeeper + * or not. Proper error handling has to be applied on the caller's side. * @throws Exception If a ZooKeeper or state handle operation fails */ @Override - public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) throws Exception { + public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) + throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); @@ -137,45 +159,51 @@ public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) thr RetrievableStateHandle storeHandle = storage.store(state); - boolean success = false; + byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { - // Serialize the state handle. This writes the state to the backend. - byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); - - // Write state handle (not the actual state) to ZooKeeper. This is expected to be - // smaller than the state itself. This level of indirection makes sure that data in - // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but - // the state can be larger. - // Create the lock node in a transaction with the actual state node. That way we can - // prevent - // race conditions with a concurrent delete operation. - client.inTransaction() - .create() - .withMode(CreateMode.PERSISTENT) - .forPath(path, serializedStoreHandle) - .and() - .create() - .withMode(CreateMode.EPHEMERAL) - .forPath(getLockPath(path)) - .and() - .commit(); - - success = true; + writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; - } catch (KeeperException.NodeExistsException e) { - // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw new AlreadyExistException("ZooKeeper node " + path + " already exists.", e); - } finally { - if (!success) { - // Cleanup the state handle if it was not written to ZooKeeper. - if (storeHandle != null) { - storeHandle.discardState(); - } + } catch (Exception e) { + if (indicatesPossiblyInconsistentState(e)) { + throw new PossibleInconsistentStateException(e); } + + // in any other failure case: discard the state + storeHandle.discardState(); + + // We wrap the exception here so that it could be caught in DefaultJobGraphStore + throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) + .map( + nee -> + new AlreadyExistException( + "ZooKeeper node " + path + " already exists.", nee)) + .orElseThrow(() -> e); } } + // this method is provided for the sole purpose of easier testing + @VisibleForTesting + protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) + throws Exception { + // Write state handle (not the actual state) to ZooKeeper. This is expected to be + // smaller than the state itself. This level of indirection makes sure that data in + // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but + // the state can be larger. + // Create the lock node in a transaction with the actual state node. That way we can + // prevent race conditions with a concurrent delete operation. + client.inTransaction() + .create() + .withMode(CreateMode.PERSISTENT) + .forPath(path, serializedStoreHandle) + .and() + .create() + .withMode(CreateMode.EPHEMERAL) + .forPath(getLockPath(path)) + .and() + .commit(); + } + /** * Replaces a state handle in ZooKeeper and discards the old state handle. * @@ -196,29 +224,55 @@ public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersi RetrievableStateHandle newStateHandle = storage.store(state); - boolean success = false; + final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + // initialize flags to serve the failure case + boolean discardOldState = false; + boolean discardNewState = true; try { - // Serialize the new state handle. This writes the state to the backend. - byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle); + setStateHandle(path, serializedStateHandle, expectedVersion.getValue()); + + // swap subject for deletion in case of success + discardOldState = true; + discardNewState = false; + } catch (Exception e) { + if (indicatesPossiblyInconsistentState(e)) { + // it's unclear whether the state handle metadata was written to ZooKeeper - + // hence, we don't discard any data + discardNewState = false; + throw new PossibleInconsistentStateException(e); + } - // Replace state handle in ZooKeeper. - client.setData() - .withVersion(expectedVersion.getValue()) - .forPath(path, serializedStateHandle); - success = true; - } catch (KeeperException.NoNodeException e) { // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw new NotExistException("ZooKeeper node " + path + " does not exist.", e); + throw ExceptionUtils.findThrowable(e, KeeperException.NoNodeException.class) + .map( + nnee -> + new NotExistException( + "ZooKeeper node " + path + " does not exist.", nnee)) + .orElseThrow(() -> e); } finally { - if (success) { + if (discardOldState) { oldStateHandle.discardState(); - } else { + } + + if (discardNewState) { newStateHandle.discardState(); } } } + // this method is provided for the sole purpose of easier testing + @VisibleForTesting + protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) + throws Exception { + // Replace state handle in ZooKeeper. + client.setData().withVersion(expectedVersion).forPath(path, serializedStateHandle); + } + + private boolean indicatesPossiblyInconsistentState(Exception e) { + return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass()); + } + /** * Returns the version of the node if it exists or -1 if it doesn't. * @@ -518,9 +572,7 @@ private RetrievableStateHandle get(String pathInZooKeeper, boolean lock) thro try { byte[] data = client.getData().forPath(path); - RetrievableStateHandle retrievableStateHandle = - InstantiationUtil.deserializeObject( - data, Thread.currentThread().getContextClassLoader()); + RetrievableStateHandle retrievableStateHandle = deserialize(data); success = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java new file mode 100644 index 0000000000000..eb79ade5b6f7f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * {@code StateHandleStoreUtilsTest} tests the utility classes collected in {@link + * StateHandleStoreUtils}. + */ +public class StateHandleStoreUtilsTest extends TestLogger { + + @Test + public void testSerializationAndDeserialization() throws Exception { + final TestingLongStateHandleHelper.LongStateHandle original = + new TestingLongStateHandleHelper.LongStateHandle(42L); + byte[] serializedData = StateHandleStoreUtils.serializeOrDiscard(original); + + final TestingLongStateHandleHelper.LongStateHandle deserializedInstance = + StateHandleStoreUtils.deserialize(serializedData); + assertThat(deserializedInstance.getStateSize(), is(original.getStateSize())); + assertThat(deserializedInstance.getValue(), is(original.getValue())); + } + + @Test + public void testSerializeOrDiscardFailureHandling() throws Exception { + final AtomicBoolean discardCalled = new AtomicBoolean(false); + final StateObject original = + new FailingSerializationStateObject(() -> discardCalled.set(true)); + + try { + StateHandleStoreUtils.serializeOrDiscard(original); + fail("An IOException is expected to be thrown."); + } catch (IOException e) { + // IOException is expected + } + + assertThat(discardCalled.get(), is(true)); + } + + @Test + public void testSerializationOrDiscardWithDiscardFailure() throws Exception { + final Exception discardException = + new IllegalStateException( + "Expected IllegalStateException that should be suppressed."); + final StateObject original = + new FailingSerializationStateObject( + () -> { + throw discardException; + }); + + try { + StateHandleStoreUtils.serializeOrDiscard(original); + fail("An IOException is expected to be thrown."); + } catch (IOException e) { + // IOException is expected + assertThat(e.getSuppressed().length, is(1)); + assertThat(e.getSuppressed()[0], is(discardException)); + } + } + + private static class FailingSerializationStateObject implements StateObject { + + private static final long serialVersionUID = 6382458109061973983L; + private final RunnableWithException discardStateRunnable; + + public FailingSerializationStateObject(RunnableWithException discardStateRunnable) { + this.discardStateRunnable = discardStateRunnable; + } + + private void writeObject(ObjectOutputStream outputStream) throws IOException { + throw new IOException("Expected IOException to test serialization error."); + } + + @Override + public void discardState() throws Exception { + discardStateRunnable.run(); + } + + @Override + public long getStateSize() { + return 0; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index e9f0bd249ae09..0acaa80560a56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; +import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -30,8 +32,10 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; +import org.hamcrest.core.IsInstanceOf; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -132,53 +136,119 @@ public void testAddAndLock() throws Exception { assertEquals(state, actual); } - /** Tests that an existing path throws an Exception. */ - @Test(expected = Exception.class) - public void testAddAlreadyExistingPath() throws Exception { + /** + * Tests that the created state handle is not discarded if ZooKeeper create fails with an + * generic exception. + */ + @Test + public void testFailingAddWithPossiblyInconsistentState() throws Exception { final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + CuratorFramework client = spy(ZOOKEEPER.getClient()); + when(client.inTransaction().create()) + .thenThrow(new RuntimeException("Expected test Exception.")); + ZooKeeperStateHandleStore store = - new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); + new ZooKeeperStateHandleStore<>(client, stateHandleProvider); - ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath"); + // Config + final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; + final long state = 81282227L; - store.addAndLock( - "/testAddAlreadyExistingPath", - new TestingLongStateHandleHelper.LongStateHandle(1L)); + try { + // Test + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); + fail("PossibleInconsistentStateException should have been thrown."); + } catch (PossibleInconsistentStateException ignored) { + // PossibleInconsistentStateException expected + } - // writing to the state storage should have succeeded + // State handle created and not discarded assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(state, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(0, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); + } - // the created state handle should have been cleaned up if the add operation failed - assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); + @Test + public void testAddFailureHandlingForNodeExistsException() { + testFailingAddWithStateDiscardTriggeredFor( + new KeeperException.NodeExistsException(), + StateHandleStore.AlreadyExistException.class); } - /** Tests that the created state handle is discarded if ZooKeeper create fails. */ @Test - public void testAddDiscardStateHandleAfterFailure() { - // Setup - final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + public void testAddFailureHandlingForBadArgumentsException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.BadArgumentsException()); + } - CuratorFramework client = spy(ZOOKEEPER.getClient()); - when(client.inTransaction().create()) - .thenThrow(new RuntimeException("Expected test Exception.")); + @Test + public void testAddFailureHandlingForNoNodeException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NoNodeException()); + } + + @Test + public void testAddFailureHandlingForNoAuthException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NoAuthException()); + } + + @Test + public void testAddFailureHandlingForBadVersionException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.BadVersionException()); + } + + @Test + public void testAddFailureHandlingForAuthFailedException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.AuthFailedException()); + } + + @Test + public void testAddFailureHandlingForInvalidACLException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.InvalidACLException()); + } + + @Test + public void testAddFailureHandlingForSessionMovedException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.SessionMovedException()); + } + + @Test + public void testAddFailureHandlingForNotReadOnlyException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NotReadOnlyException()); + } + + private static void testFailingAddWithStateDiscardTriggeredFor(Exception actualException) { + testFailingAddWithStateDiscardTriggeredFor(actualException, actualException.getClass()); + } + + private static void testFailingAddWithStateDiscardTriggeredFor( + Exception actualException, Class expectedException) { + final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); ZooKeeperStateHandleStore store = - new ZooKeeperStateHandleStore<>(client, stateHandleProvider); + new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), stateHandleProvider) { + @Override + protected void writeStoreHandleTransactionally( + String path, byte[] serializedStoreHandle) throws Exception { + throw actualException; + } + }; // Config - final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; + final String pathInZooKeeper = + "/testAddDiscardStateHandleAfterFailure-" + expectedException.getSimpleName(); final long state = 81282227L; try { // Test store.addAndLock( pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); - fail("Did not throw expected exception"); - } catch (Exception ignored) { + fail(expectedException.getSimpleName() + " should have been thrown."); + } catch (Exception ex) { + assertThat(ex, IsInstanceOf.instanceOf(expectedException)); } - // Verify // State handle created and discarded assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); assertEquals(state, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); @@ -281,7 +351,115 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); - assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0), is(0)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1), is(0)); + + // Initial value + @SuppressWarnings("unchecked") + final long actual = + ((RetrievableStateHandle) + InstantiationUtil.deserializeObject( + ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())) + .retrieveState() + .getValue(); + + assertEquals(initialState, actual); + } + + @Test + public void testDiscardAfterReplaceFailureWithNoNodeException() throws Exception { + testDiscardAfterReplaceFailureWith( + new KeeperException.NoNodeException(), StateHandleStore.NotExistException.class); + } + + @Test + public void testDiscardAfterReplaceFailureWithNodeExistsException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NodeExistsException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithBadArgumentsException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.BadArgumentsException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithNoAuthException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NoAuthException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithBadVersionException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.BadVersionException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithAuthFailedException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.AuthFailedException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithInvalidACLException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.InvalidACLException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithSessionMovedException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.SessionMovedException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithNotReadOnlyException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NotReadOnlyException()); + } + + private static void testDiscardAfterReplaceFailureWith(Exception actualException) + throws Exception { + testDiscardAfterReplaceFailureWith(actualException, actualException.getClass()); + } + + private static void testDiscardAfterReplaceFailureWith( + Exception actualException, Class expectedException) + throws Exception { + final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + + ZooKeeperStateHandleStore store = + new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), stateHandleProvider) { + @Override + protected void setStateHandle( + String path, byte[] serializedStateHandle, int expectedVersion) + throws Exception { + throw actualException; + } + }; + + // Config + final String pathInZooKeeper = + "/testReplaceDiscardStateHandleAfterFailure-" + expectedException.getSimpleName(); + final long initialState = 30968470898L; + final long replaceState = 88383776661L; + + // Test + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); + + try { + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); + fail("Did not throw expected exception"); + } catch (Throwable t) { + assertThat(t, IsInstanceOf.instanceOf(expectedException)); + } + + // State handle created and discarded + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0), is(0)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1), is(1)); // Initial value @SuppressWarnings("unchecked") From 5aec1bb94946b736ef2c6fb631ebf2bf57bdd979 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 4 May 2021 17:37:35 +0200 Subject: [PATCH 4/5] [FLINK-22494][runtime] Refactors CheckpointsCleaner to handle also discardOnFailedStoring --- .../checkpoint/CheckpointCoordinator.java | 15 +----- .../checkpoint/CheckpointsCleaner.java | 46 ++++++++++++++----- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index f4d5b9141bc99..7c7edd859bd2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1211,20 +1211,7 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest); } catch (Exception exception) { // we failed to store the completed checkpoint. Let's clean up - executor.execute( - new Runnable() { - @Override - public void run() { - try { - completedCheckpoint.discardOnFailedStoring(); - } catch (Throwable t) { - LOG.warn( - "Could not properly discard completed checkpoint {}.", - completedCheckpoint.getCheckpointID(), - t); - } - } - }); + checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor); sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp()); throw new CheckpointException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java index ff2c6bef4e322..a1de9de330ac3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.util.function.RunnableWithException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ */ public class CheckpointsCleaner implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + private static final long serialVersionUID = 2545865801947537790L; private final AtomicInteger numberOfCheckpointsToClean; @@ -47,23 +50,44 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { + cleanup( + checkpoint, + () -> { + if (shouldDiscard) { + checkpoint.discard(); + } + }, + postCleanAction, + executor); + } + + public void cleanCheckpointOnFailedStoring( + CompletedCheckpoint completedCheckpoint, Executor executor) { + cleanup( + completedCheckpoint, + completedCheckpoint::discardOnFailedStoring, + () -> {}, + executor); + } + + private void cleanup( + Checkpoint checkpoint, + RunnableWithException cleanupAction, + Runnable postCleanupAction, + Executor executor) { numberOfCheckpointsToClean.incrementAndGet(); executor.execute( () -> { try { - if (shouldDiscard) { - try { - checkpoint.discard(); - } catch (Exception e) { - LOG.warn( - "Could not discard completed checkpoint {}.", - checkpoint.getCheckpointID(), - e); - } - } + cleanupAction.run(); + } catch (Exception e) { + LOG.warn( + "Could not properly discard completed checkpoint {}.", + checkpoint.getCheckpointID(), + e); } finally { numberOfCheckpointsToClean.decrementAndGet(); - postCleanAction.run(); + postCleanupAction.run(); } }); } From 6b53f8b0e01d36f546bd52c8630c9b98ee2eed0a Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 4 May 2021 15:32:36 +0200 Subject: [PATCH 5/5] [FLINK-22494][runtime] Adds PossibleInconsistentStateException handling to CheckpointCoordinator --- .../checkpoint/CheckpointCoordinator.java | 13 ++- .../DefaultCompletedCheckpointStore.java | 4 + .../CheckpointCoordinatorFailureTest.java | 92 ++++++++++++++++++- .../CheckpointCoordinatorTestingUtils.java | 6 ++ 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 7c7edd859bd2f..dbba5f0ebfd50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorInfo; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; @@ -1210,8 +1211,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) completedCheckpointStore.addCheckpoint( completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest); } catch (Exception exception) { - // we failed to store the completed checkpoint. Let's clean up - checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor); + if (exception instanceof PossibleInconsistentStateException) { + LOG.warn( + "An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.", + completedCheckpoint.getCheckpointID(), + completedCheckpoint.getExternalPointer()); + } else { + // we failed to store the completed checkpoint. Let's clean up + checkpointsCleaner.cleanCheckpointOnFailedStoring( + completedCheckpoint, executor); + } sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp()); throw new CheckpointException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index f99ceea937901..98549b64b317d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.ResourceVersion; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -212,6 +213,9 @@ public void recover() throws Exception { * older ones. * * @param checkpoint Completed checkpoint to add. + * @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the + * system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint + * metadata was fully written to the underlying systems or not. */ @Override public void addCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index ddc2aac905fdb..f855eff2264d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -28,21 +28,29 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -53,6 +61,8 @@ /** Tests for failure of checkpoint coordinator. */ public class CheckpointCoordinatorFailureTest extends TestLogger { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + /** * Tests that a failure while storing a completed checkpoint in the completed checkpoint store * will properly fail the originating pending checkpoint and clean upt the completed checkpoint. @@ -73,7 +83,10 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception { new CheckpointCoordinatorBuilder() .setJobId(jid) .setTasks(new ExecutionVertex[] {vertex}) - .setCompletedCheckpointStore(new FailingCompletedCheckpointStore()) + .setCompletedCheckpointStore( + new FailingCompletedCheckpointStore( + new Exception( + "The failing completed checkpoint store failed again... :-("))) .setTimer(manuallyTriggeredScheduledExecutor) .build(); @@ -158,8 +171,83 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception { .discardState(); } + @Test + public void testCleanupForGenericFailure() throws Exception { + testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1); + } + + @Test + public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception { + testStoringFailureHandling(new PossibleInconsistentStateException(), 0); + } + + private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls) + throws Exception { + final JobID jobId = new JobID(); + + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + final ExecutionVertex vertex = + CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptId); + + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + + final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final CompletedCheckpointStore completedCheckpointStore = + new FailingCompletedCheckpointStore(failure); + + final AtomicInteger cleanupCallCount = new AtomicInteger(0); + final CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setJobId(jobId) + .setTasks(new ExecutionVertex[] {vertex}) + .setCheckpointIDCounter(checkpointIDCounter) + .setCheckpointsCleaner( + new CheckpointsCleaner() { + + private static final long serialVersionUID = + 2029876992397573325L; + + @Override + public void cleanCheckpointOnFailedStoring( + CompletedCheckpoint completedCheckpoint, + Executor executor) { + cleanupCallCount.incrementAndGet(); + super.cleanCheckpointOnFailedStoring( + completedCheckpoint, executor); + } + }) + .setCompletedCheckpointStore(completedCheckpointStore) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(); + checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath()); + manuallyTriggeredScheduledExecutor.triggerAll(); + + try { + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint( + jobId, executionAttemptId, checkpointIDCounter.getLast()), + "unknown location"); + fail("CheckpointException should have been thrown."); + } catch (CheckpointException e) { + assertThat( + e.getCheckpointFailureReason(), + is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE)); + } + + assertThat(cleanupCallCount.get(), is(expectedCleanupCalls)); + } + private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { + private final Exception addCheckpointFailure; + + public FailingCompletedCheckpointStore(Exception addCheckpointFailure) { + this.addCheckpointFailure = addCheckpointFailure; + } + @Override public void recover() throws Exception { throw new UnsupportedOperationException("Not implemented."); @@ -171,7 +259,7 @@ public void addCheckpoint( CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception { - throw new Exception("The failing completed checkpoint store failed again... :-("); + throw addCheckpointFailure; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 481e25d9072ad..3318230a2e1b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -706,6 +706,12 @@ public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint( return this; } + public CheckpointCoordinatorBuilder setCheckpointsCleaner( + CheckpointsCleaner checkpointsCleaner) { + this.checkpointsCleaner = checkpointsCleaner; + return this; + } + public CheckpointCoordinatorBuilder setCheckpointIDCounter( CheckpointIDCounter checkpointIDCounter) { this.checkpointIDCounter = checkpointIDCounter;