Skip to content

Commit

Permalink
IGNITE-13101 Metastore should complete all write futures during stop …
Browse files Browse the repository at this point in the history
…and prohibit creating new ones - Fixes #8554.

Signed-off-by: Sergey Chugunov <sergey.chugunov@gmail.com>
  • Loading branch information
ibessonov authored and sergey-chugunov-1985 committed Dec 11, 2020
1 parent f48f31e commit a082ea0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
Expand All @@ -41,6 +42,7 @@
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
Expand All @@ -59,12 +61,14 @@
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
Expand Down Expand Up @@ -175,6 +179,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
*/
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();

/** */
private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock();

/** */
private boolean stopped;

/**
* Lock to access/update data and component's state.
*/
Expand Down Expand Up @@ -287,7 +297,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) {
finally {
lock.writeLock().unlock();

cancelUpdateFutures();
cancelUpdateFutures(nodeStoppingException(), true);
}
}

Expand Down Expand Up @@ -914,7 +924,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData

ver = INITIAL_VERSION;

cancelUpdateFutures();
cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false);
}
finally {
lock.writeLock().unlock();
Expand All @@ -924,13 +934,28 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData
/**
* Cancel all waiting futures and clear the map.
*/
private void cancelUpdateFutures() {
for (GridFutureAdapter<Boolean> fut : updateFuts.values())
fut.onDone(new IgniteCheckedException("Client was disconnected during the operation."));
private void cancelUpdateFutures(Exception e, boolean stop) {
updateFutsStopLock.writeLock().lock();

try {
stopped = stop;

for (GridFutureAdapter<Boolean> fut : updateFuts.values())
fut.onDone(e);

updateFuts.clear();
updateFuts.clear();
}
finally {
updateFutsStopLock.writeLock().unlock();
}
}

/** */
private static NodeStoppingException nodeStoppingException() {
return new NodeStoppingException("Node is stopping.");
}


/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
assert isClient;
Expand Down Expand Up @@ -1033,14 +1058,12 @@ else if (!isClient && ver.id() > 0) {
* @throws IgniteCheckedException If there was an error while sending discovery message.
*/
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
if (!isSupported(ctx))
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);

UUID reqId = UUID.randomUUID();

GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
GridFutureAdapter<?> fut = prepareWriteFuture(key, reqId);

updateFuts.put(reqId, fut);
if (fut.isDone())
return fut;

DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);

Expand All @@ -1054,14 +1077,12 @@ private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws Igni
*/
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
throws IgniteCheckedException {
if (!isSupported(ctx))
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);

UUID reqId = UUID.randomUUID();

GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
GridFutureAdapter<Boolean> fut = prepareWriteFuture(key, reqId);

updateFuts.put(reqId, fut);
if (fut.isDone())
return fut;

DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);

Expand All @@ -1070,6 +1091,58 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
return fut;
}

/**
* This method will perform some preliminary checks before starting write or cas operation.
* It also updates {@link #updateFuts} in case if everything's ok.
*
* Tricky part is exception handling from "isSupported" method. It can be thrown by
* {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is.
* There are components that rely on distributed metastorage throwing {@link NodeStoppingException}.
*
* @return Future that must be returned immediately or {@code null}.
* @throws IgniteCheckedException If cluster can't perform this update.
*/
private GridFutureAdapter<Boolean> prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException {
boolean supported;

try {
supported = isSupported(ctx);
}
catch (Exception e) {
if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

fut.onDone(nodeStoppingException());

return fut;
}

throw e;
}

if (!supported)
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);

GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

updateFutsStopLock.readLock().lock();

try {
if (stopped) {
fut.onDone(nodeStoppingException());

return fut;
}

updateFuts.put(reqId, fut);
}
finally {
updateFutsStopLock.readLock().unlock();
}

return fut;
}

/**
* Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
* current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,11 @@ else if (m instanceof HistogramMetric)
opsFut.markInitialized();
opsFut.get();
}
catch (NodeStoppingException ignored) {
// No-op.
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
log.error("Failed to remove metrics configuration.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.DataRegionConfiguration;
Expand All @@ -31,7 +32,9 @@
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
Expand Down Expand Up @@ -133,6 +136,19 @@ public void testSingleNode() throws Exception {
metastorage.remove("key");

assertNull(metastorage.read("key"));

stopGrid(0);

try {
metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS);

fail("Exception is expected");
}
catch (Exception e) {
assertTrue(X.hasCause(e, NodeStoppingException.class));

assertTrue(e.getMessage().contains("Node is stopping."));
}
}

/**
Expand Down

0 comments on commit a082ea0

Please sign in to comment.