From a082ea051ce8c9a923f1c1368e119e3fce25c3ca Mon Sep 17 00:00:00 2001 From: ibessonov Date: Fri, 11 Dec 2020 15:10:26 +0300 Subject: [PATCH] IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554. Signed-off-by: Sergey Chugunov --- .../DistributedMetaStorageImpl.java | 105 +++++++++++++++--- .../processors/metric/GridMetricManager.java | 5 +- .../DistributedMetaStorageTest.java | 16 +++ 3 files changed, 109 insertions(+), 17 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 4ffd0caaa7396..880f6454eb610 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -59,12 +61,14 @@ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.IgniteNodeValidationResult; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; @@ -175,6 +179,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter */ private final ConcurrentMap> updateFuts = new ConcurrentHashMap<>(); + /** */ + private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock(); + + /** */ + private boolean stopped; + /** * Lock to access/update data and component's state. */ @@ -287,7 +297,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) { finally { lock.writeLock().unlock(); - cancelUpdateFutures(); + cancelUpdateFutures(nodeStoppingException(), true); } } @@ -914,7 +924,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData ver = INITIAL_VERSION; - cancelUpdateFutures(); + cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false); } finally { lock.writeLock().unlock(); @@ -924,13 +934,28 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData /** * Cancel all waiting futures and clear the map. */ - private void cancelUpdateFutures() { - for (GridFutureAdapter fut : updateFuts.values()) - fut.onDone(new IgniteCheckedException("Client was disconnected during the operation.")); + private void cancelUpdateFutures(Exception e, boolean stop) { + updateFutsStopLock.writeLock().lock(); + + try { + stopped = stop; + + for (GridFutureAdapter fut : updateFuts.values()) + fut.onDone(e); - updateFuts.clear(); + updateFuts.clear(); + } + finally { + updateFutsStopLock.writeLock().unlock(); + } + } + + /** */ + private static NodeStoppingException nodeStoppingException() { + return new NodeStoppingException("Node is stopping."); } + /** {@inheritDoc} */ @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) { assert isClient; @@ -1033,14 +1058,12 @@ else if (!isClient && ver.id() > 0) { * @throws IgniteCheckedException If there was an error while sending discovery message. */ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws IgniteCheckedException { - if (!isSupported(ctx)) - throw new IgniteCheckedException(NOT_SUPPORTED_MSG); - UUID reqId = UUID.randomUUID(); - GridFutureAdapter fut = new GridFutureAdapter<>(); + GridFutureAdapter fut = prepareWriteFuture(key, reqId); - updateFuts.put(reqId, fut); + if (fut.isDone()) + return fut; DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes); @@ -1054,14 +1077,12 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni */ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException { - if (!isSupported(ctx)) - throw new IgniteCheckedException(NOT_SUPPORTED_MSG); - UUID reqId = UUID.randomUUID(); - GridFutureAdapter fut = new GridFutureAdapter<>(); + GridFutureAdapter fut = prepareWriteFuture(key, reqId); - updateFuts.put(reqId, fut); + if (fut.isDone()) + return fut; DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes); @@ -1070,6 +1091,58 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte return fut; } + /** + * This method will perform some preliminary checks before starting write or cas operation. + * It also updates {@link #updateFuts} in case if everything's ok. + * + * Tricky part is exception handling from "isSupported" method. It can be thrown by + * {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is. + * There are components that rely on distributed metastorage throwing {@link NodeStoppingException}. + * + * @return Future that must be returned immediately or {@code null}. + * @throws IgniteCheckedException If cluster can't perform this update. + */ + private GridFutureAdapter prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException { + boolean supported; + + try { + supported = isSupported(ctx); + } + catch (Exception e) { + if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) { + GridFutureAdapter fut = new GridFutureAdapter<>(); + + fut.onDone(nodeStoppingException()); + + return fut; + } + + throw e; + } + + if (!supported) + throw new IgniteCheckedException(NOT_SUPPORTED_MSG); + + GridFutureAdapter fut = new GridFutureAdapter<>(); + + updateFutsStopLock.readLock().lock(); + + try { + if (stopped) { + fut.onDone(nodeStoppingException()); + + return fut; + } + + updateFuts.put(reqId, fut); + } + finally { + updateFutsStopLock.readLock().unlock(); + } + + return fut; + } + /** * Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on * current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java index fddf7ff91fab2..267b2a11ccab2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java @@ -439,8 +439,11 @@ else if (m instanceof HistogramMetric) opsFut.markInitialized(); opsFut.get(); } + catch (NodeStoppingException ignored) { + // No-op. + } catch (IgniteCheckedException e) { - throw new IgniteException(e); + log.error("Failed to remove metrics configuration.", e); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 763a806c3a8e2..298e20213b9c2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -31,7 +32,9 @@ import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -133,6 +136,19 @@ public void testSingleNode() throws Exception { metastorage.remove("key"); assertNull(metastorage.read("key")); + + stopGrid(0); + + try { + metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS); + + fail("Exception is expected"); + } + catch (Exception e) { + assertTrue(X.hasCause(e, NodeStoppingException.class)); + + assertTrue(e.getMessage().contains("Node is stopping.")); + } } /**