diff --git a/pom.xml b/pom.xml index e21226c..e635be9 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ ch.qos.logback logback-classic - 1.2.11 + 1.4.12 test @@ -162,14 +162,44 @@ 1 true - - **/it/**/*Test.java - + + + + org.apache.maven.plugins + maven-failsafe-plugin + 2.19.1 + + false + + true + true + true + false + 1000 + io.vertx.core.logging.SLF4JLogDelegateFactory + PARANOID + ${project.build.directory} + ${project.version} + + + + -Xms512M -Xmx1200M + + 1 + true + + maven-surefire-plugin + + + **/it/**/*Test.java + + + maven-failsafe-plugin @@ -216,6 +246,21 @@ + + org.apache.maven.plugins + maven-failsafe-plugin + + + + -Xms512M -Xmx1200M + + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + + + diff --git a/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java b/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java index c57a04e..183fb1e 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java @@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -197,7 +196,7 @@ public void nodeListener(NodeListener nodeListener) { @Override public void getAsyncMap(String name, Promise> promise) { - vertx.>executeBlocking(prom -> prom.complete(new AsyncMapImpl<>(getCache(name), vertx))).onComplete(promise); + vertx.>executeBlocking(() -> new AsyncMapImpl<>(getCache(name), vertx)).onComplete(promise); } @Override @@ -207,7 +206,7 @@ public Map getSyncMap(String name) { @Override public void getLockWithTimeout(String name, long timeout, Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { IgniteSemaphore semaphore = ignite.semaphore(LOCK_SEMAPHORE_PREFIX + name, 1, true, true); boolean locked; long remaining = timeout; @@ -217,7 +216,7 @@ public void getLockWithTimeout(String name, long timeout, Promise promise) remaining = remaining - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS); } while (!locked && remaining > 0); if (locked) { - prom.complete(new LockImpl(semaphore, lockReleaseExec)); + return new LockImpl(semaphore, lockReleaseExec); } else { throw new VertxException("Timed out waiting to get lock " + name); } @@ -226,7 +225,7 @@ public void getLockWithTimeout(String name, long timeout, Promise promise) @Override public void getCounter(String name, Promise promise) { - vertx.executeBlocking(prom -> prom.complete(new CounterImpl(ignite.atomicLong(name, 0, true)))).onComplete(promise); + vertx.executeBlocking(() -> new CounterImpl(ignite.atomicLong(name, 0, true))).onComplete(promise); } @Override @@ -240,9 +239,9 @@ public void setNodeInfo(NodeInfo nodeInfo, Promise promise) { this.nodeInfo = nodeInfo; } IgniteNodeInfo value = new IgniteNodeInfo(nodeInfo); - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { nodeInfoMap.put(nodeId, value); - prom.complete(); + return null; }, false).onComplete(promise); } @@ -270,8 +269,15 @@ public void getNodeInfo(String id, Promise promise) { @Override public List getNodes() { try { - return ignite.cluster().nodes().stream() - .map(IgniteClusterManager::nodeId).collect(Collectors.toList()); + Collection nodes = ignite.cluster().nodes(); + List nodeIds; + synchronized (nodes) { + nodeIds = new ArrayList<>(nodes.size()); + for (ClusterNode node : nodes) { + nodeIds.add(nodeId(node)); + } + } + return nodeIds; } catch (IllegalStateException e) { log.debug(e.getMessage()); return Collections.emptyList(); @@ -280,7 +286,7 @@ public List getNodes() { @Override public void join(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { synchronized (monitor) { if (!active) { active = true; @@ -303,14 +309,13 @@ public void join(Promise promise) { return false; } - vertx.executeBlocking(f -> { + vertx.executeBlocking(() -> { String id = nodeId(((DiscoveryEvent) event).eventNode()); switch (event.type()) { case EVT_NODE_JOINED: notifyNodeListener(listener -> listener.nodeAdded(id)); log.debug("node " + id + " joined the cluster"); - f.complete(); - break; + return null; case EVT_NODE_LEFT: case EVT_NODE_FAILED: if (cleanNodeInfos(id)) { @@ -318,8 +323,7 @@ public void join(Promise promise) { } notifyNodeListener(listener -> listener.nodeLeft(id)); log.debug("node " + id + " left the cluster"); - f.complete(); - break; + return null; case EVT_NODE_SEGMENTED: if (customIgnite || !shutdownOnSegmentation) { log.warn("node got segmented"); @@ -327,10 +331,9 @@ public void join(Promise promise) { log.warn("node got segmented and will be shut down"); vertx.close(); } - f.fail(new IllegalStateException("node is stopped")); - break; + throw new IllegalStateException("node is stopped"); default: - f.fail("event not known"); + throw new IllegalStateException("event not known"); } }); @@ -343,18 +346,18 @@ public void join(Promise promise) { try { MILLISECONDS.sleep(delayAfterStart); - prom.complete(); } catch (InterruptedException e) { - prom.fail(e); + throw new IllegalStateException(e); } } + return null; } }).onComplete(promise); } @Override public void leave(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { synchronized (monitor) { if (active) { active = false; @@ -370,12 +373,10 @@ public void leave(Promise promise) { } catch (Exception e) { log.error(e); } - subsMapHelper = null; - nodeInfoMap = null; } } - prom.complete(); + return null; }).onComplete(promise); } @@ -386,39 +387,29 @@ public boolean isActive() { @Override public void addRegistration(String address, RegistrationInfo registrationInfo, Promise promise) { - vertx.executeBlocking(prom -> { - subsMapHelper.put(address, registrationInfo) - .onComplete(prom); - }, false).onComplete(promise); + subsMapHelper.put(address, registrationInfo) + .onComplete(promise); } @Override public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise promise) { - vertx.executeBlocking(prom -> { - subsMapHelper.remove(address, registrationInfo, prom); - }, false).onComplete(promise); + subsMapHelper.remove(address, registrationInfo, promise); } @Override public void getRegistrations(String address, Promise> promise) { - vertx.>executeBlocking(prom -> { - subsMapHelper.get(address, prom); - }, false).onComplete(promise); + subsMapHelper.get(address, promise); } private void cleanSubs(String id) { - try { - subsMapHelper.removeAllForNode(id); - } catch (IllegalStateException | CacheException e) { - //ignore - } + subsMapHelper.removeAllForNode(id); } private boolean cleanNodeInfos(String nid) { try { return nodeInfoMap.remove(nid); } catch (IllegalStateException | CacheException e) { - //ignore + log.warn("Could not remove nodeInfo (" + nid + "): " + e.getMessage()); } return false; } @@ -519,7 +510,7 @@ private CounterImpl(IgniteAtomicLong cnt) { @Override public Future get() { - return vertx.executeBlocking(fut -> fut.complete(cnt.get())); + return vertx.executeBlocking(cnt::get); } @Override @@ -530,7 +521,7 @@ public void get(Handler> handler) { @Override public Future incrementAndGet() { - return vertx.executeBlocking(fut -> fut.complete(cnt.incrementAndGet())); + return vertx.executeBlocking(cnt::incrementAndGet); } @Override @@ -541,7 +532,7 @@ public void incrementAndGet(Handler> handler) { @Override public Future getAndIncrement() { - return vertx.executeBlocking(fut -> fut.complete(cnt.getAndIncrement())); + return vertx.executeBlocking(cnt::getAndIncrement); } @Override @@ -552,7 +543,7 @@ public void getAndIncrement(Handler> handler) { @Override public Future decrementAndGet() { - return vertx.executeBlocking(fut -> fut.complete(cnt.decrementAndGet())); + return vertx.executeBlocking(cnt::decrementAndGet); } @Override @@ -563,7 +554,7 @@ public void decrementAndGet(Handler> handler) { @Override public Future addAndGet(long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.addAndGet(value))); + return vertx.executeBlocking(() -> cnt.addAndGet(value)); } @Override @@ -574,7 +565,7 @@ public void addAndGet(long value, Handler> handler) { @Override public Future getAndAdd(long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.getAndAdd(value))); + return vertx.executeBlocking(() -> cnt.getAndAdd(value)); } @Override @@ -585,7 +576,7 @@ public void getAndAdd(long value, Handler> handler) { @Override public Future compareAndSet(long expected, long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.compareAndSet(expected, value))); + return vertx.executeBlocking(() -> cnt.compareAndSet(expected, value)); } @Override diff --git a/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java b/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java index cd7b7d7..3198ec6 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java @@ -135,16 +135,16 @@ public Future> values() { @Override public Future> entries() { return vertx.executeBlocking( - promise -> { + () -> { try { List> all = cache.query(new ScanQuery()).getAll(); Map map = new HashMap<>(all.size()); for (Cache.Entry entry : all) { map.put(unmarshal(entry.getKey()), unmarshal(entry.getValue())); } - promise.complete(map); + return map; } catch (final RuntimeException cause) { - promise.fail(new VertxException(cause)); + throw new VertxException(cause); } } ); @@ -163,17 +163,12 @@ private Future executeWithTtl(Function, IgniteFuture : cache; return vertx.executeBlocking( - promise -> { - IgniteFuture future = cacheOp.apply(cache0); - future.listen( - fut -> { - try { - promise.complete(unmarshal(future.get())); - } catch (final RuntimeException e) { - promise.fail(new VertxException(e)); - } - } - ); + () -> { + try { + return unmarshal(cacheOp.apply(cache0).get()); + } catch (final RuntimeException e) { + throw new VertxException(e); + } } ); } diff --git a/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java b/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java index 9b7c577..6c531d2 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java @@ -19,6 +19,8 @@ import io.vertx.core.Promise; import io.vertx.core.VertxException; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.spi.cluster.NodeSelector; import io.vertx.core.spi.cluster.RegistrationInfo; import io.vertx.core.spi.cluster.RegistrationUpdateEvent; @@ -33,16 +35,15 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static java.util.stream.Collectors.toList; - /** * @author Thomas Segismont * @author Lukas Prettenthaler */ public class SubsMapHelper { + private static final Logger log = LoggerFactory.getLogger(SubsMapHelper.class); + private final IgniteCache map; private final NodeSelector nodeSelector; private final ConcurrentMap> localSubs = new ConcurrentHashMap<>(); @@ -67,12 +68,11 @@ public void get(String address, Promise> promise) { return; } try { - List infos = map.query(new ScanQuery((k, v) -> k.address().equals(address))) - .getAll().stream() - .map(Cache.Entry::getKey) - .map(IgniteRegistrationInfo::registrationInfo) - .collect(toList()); - int size = infos.size(); + List> remote = map + .query(new ScanQuery((k, v) -> k.address().equals(address))) + .getAll(); + List infos; + int size = remote.size(); Set local = localSubs.get(address); if (local != null) { synchronized (local) { @@ -81,13 +81,18 @@ public void get(String address, Promise> promise) { promise.complete(Collections.emptyList()); return; } + infos = new ArrayList<>(size); infos.addAll(local); } } else if (size == 0) { promise.complete(Collections.emptyList()); return; + } else { + infos = new ArrayList<>(size); + } + for (Cache.Entry info : remote) { + infos.add(info.getKey().registrationInfo()); } - promise.complete(infos); } catch (IllegalStateException | CacheException e) { promise.fail(new VertxException(e)); @@ -127,7 +132,7 @@ public void remove(String address, RegistrationInfo registrationInfo, Promise removeFromSet(registrationInfo, curr)); fireRegistrationUpdateEvent(address); } else { - map.remove(new IgniteRegistrationInfo(address, registrationInfo)); + map.remove(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE); } promise.complete(); } catch (IllegalStateException | CacheException e) { @@ -141,14 +146,15 @@ private Set removeFromSet(RegistrationInfo registrationInfo, S } public void removeAllForNode(String nodeId) { - TreeSet toRemove = map.query(new ScanQuery((k, v) -> k.registrationInfo().nodeId().equals(nodeId))) - .getAll().stream() - .map(Cache.Entry::getKey) - .collect(Collectors.toCollection(TreeSet::new)); - try { - map.removeAll(toRemove); - } catch (IllegalStateException | CacheException t) { - //ignore + List> toRemove = map + .query(new ScanQuery((k, v) -> k.registrationInfo().nodeId().equals(nodeId))) + .getAll(); + for (Cache.Entry info : toRemove) { + try { + map.remove(info.getKey(), Boolean.TRUE); + } catch (IllegalStateException | CacheException t) { + log.warn("Could not remove subscriber: " + t.getMessage()); + } } } @@ -174,12 +180,12 @@ private Future> getAndUpdate(String address) { } private void listen(final Iterable> events, final VertxInternal vertxInternal) { - vertxInternal.>executeBlocking(promise -> { + vertxInternal.>executeBlocking(() -> { StreamSupport.stream(events.spliterator(), false) .map(e -> e.getKey().address()) .distinct() .forEach(this::fireRegistrationUpdateEvent); - promise.complete(); + return null; }); } } diff --git a/src/test/java/io/vertx/spi/cluster/ignite/impl/ThrottlingTest.java b/src/test/java/io/vertx/spi/cluster/ignite/impl/ThrottlingTest.java index ee44ea3..1227dbb 100644 --- a/src/test/java/io/vertx/spi/cluster/ignite/impl/ThrottlingTest.java +++ b/src/test/java/io/vertx/spi/cluster/ignite/impl/ThrottlingTest.java @@ -48,7 +48,7 @@ public void testInterval() throws Exception { Throttling throttling = new Throttling((VertxInternal) vertx, address -> { events.compute(address, (k, v) -> { if (v == null) { - v = Collections.synchronizedList(new LinkedList<>()); + v = Collections.synchronizedList(Collections.synchronizedList(new LinkedList<>())); } v.add(System.nanoTime()); return v; diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index ad977b6..d21466a 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -22,6 +22,7 @@ +