From 2eb68f44d7943c54dbad1e9aa37551d75bc19be5 Mon Sep 17 00:00:00 2001 From: Amin Date: Tue, 8 Mar 2022 14:41:47 +0330 Subject: [PATCH 1/4] Add health check configurable on init --- .../nebula/client/graph/NebulaPoolConfig.java | 20 +++++++++--- .../nebula/client/graph/net/NebulaPool.java | 10 ++++-- .../graph/net/RoundRobinLoadBalancer.java | 31 ++++++++++++------- .../client/graph/net/TestConnectionPool.java | 1 + 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java b/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java index d3e308902..168424e67 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java @@ -26,16 +26,19 @@ public class NebulaPoolConfig implements Serializable { // 0 means never delete private int idleTime = 0; - // the interval time to check idle connection, unit ms, -1 means no check + // The interval time to check idle connection, unit ms, -1 means no check private int intervalIdle = -1; - // the wait time to get idle connection, unit ms + // The wait time to get idle connection, unit ms private int waitTime = 0; - // set to true to turn on ssl encrypted traffic + // The minimum rate of healthy servers to all servers. if 1 it means all servers should be available on init. + private double minClusterHealthRate = 1; + + // Set to true to turn on ssl encrypted traffic private boolean enableSsl = false; - // ssl param is required if ssl is turned on + // SSL param is required if ssl is turned on private SSLParam sslParam = null; public boolean isEnableSsl() { @@ -107,4 +110,13 @@ public NebulaPoolConfig setWaitTime(int waitTime) { this.waitTime = waitTime; return this; } + + public double getMinClusterHealthRate() { + return minClusterHealthRate; + } + + public NebulaPoolConfig setMinClusterHealthRate(double minClusterHealthRate) { + this.minClusterHealthRate = minClusterHealthRate; + return this; + } } diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index f2734cba1..c53f41399 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -72,6 +72,11 @@ private void checkConfig(NebulaPoolConfig config) { throw new InvalidConfigException( "Config waitTime:" + config.getWaitTime() + " is illegal"); } + + if (config.getWaitTime() < 0) { + throw new InvalidConfigException( + "Config waitTime:" + config.getWaitTime() + " is illegal"); + } } /** @@ -90,8 +95,9 @@ public boolean init(List addresses, NebulaPoolConfig config) this.waitTime = config.getWaitTime(); List newAddrs = hostToIp(addresses); this.loadBalancer = config.isEnableSsl() - ? new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getSslParam()) - : new RoundRobinLoadBalancer(newAddrs, config.getTimeout()); + ? new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getSslParam(), + config.getMinClusterHealthRate()) + : new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getMinClusterHealthRate()); ConnObjectPool objectPool = new ConnObjectPool(this.loadBalancer, config); this.objectPool = new GenericObjectPool<>(objectPool); GenericObjectPoolConfig objConfig = new GenericObjectPoolConfig(); diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java index 6865fafeb..bd121c7d0 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java @@ -21,24 +21,27 @@ public class RoundRobinLoadBalancer implements LoadBalancer { private static final int S_BAD = 1; private final List addresses = new ArrayList<>(); private final Map serversStatus = new ConcurrentHashMap<>(); + private final double minClusterHealthRate; private final int timeout; private final AtomicInteger pos = new AtomicInteger(0); - private final int delayTime = 60; // unit seconds + private final int delayTime = 60; // Unit seconds private final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1); private SSLParam sslParam; private boolean enabledSsl; - public RoundRobinLoadBalancer(List addresses, int timeout) { + public RoundRobinLoadBalancer(List addresses, int timeout, double minClusterHealthRate) { this.timeout = timeout; for (HostAddress addr : addresses) { this.addresses.add(addr); this.serversStatus.put(addr, S_BAD); } + this.minClusterHealthRate = minClusterHealthRate; schedule.scheduleAtFixedRate(this::scheduleTask, 0, delayTime, TimeUnit.SECONDS); } - public RoundRobinLoadBalancer(List addresses, int timeout, SSLParam sslParam) { - this(addresses, timeout); + public RoundRobinLoadBalancer(List addresses, int timeout, SSLParam sslParam, + double minClusterHealthRate) { + this(addresses, timeout, minClusterHealthRate); this.sslParam = sslParam; this.enabledSsl = true; } @@ -63,11 +66,11 @@ public HostAddress getAddress() { } public void updateServersStatus() { - for (HostAddress addr : addresses) { - if (ping(addr)) { - serversStatus.put(addr, S_OK); + for (HostAddress hostAddress : addresses) { + if (ping(hostAddress)) { + serversStatus.put(hostAddress, S_OK); } else { - serversStatus.put(addr, S_BAD); + serversStatus.put(hostAddress, S_BAD); } } } @@ -93,12 +96,16 @@ public boolean ping(HostAddress addr) { public boolean isServersOK() { this.updateServersStatus(); - for (HostAddress addr : addresses) { - if (serversStatus.get(addr) == S_BAD) { - return false; + double numServersWithOkStatus = 0; + for (HostAddress hostAddress : addresses) { + if (serversStatus.get(hostAddress) == S_OK) { + numServersWithOkStatus++; } } - return true; + + // Check health rate. + double okServersRate = numServersWithOkStatus / addresses.size(); + return okServersRate >= minClusterHealthRate; } private void scheduleTask() { diff --git a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java index 77b2bd5d6..4a9e72059 100644 --- a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java +++ b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java @@ -128,6 +128,7 @@ public void testGetSession() { // set idle time nebulaPoolConfig.setIdleTime(2000); nebulaPoolConfig.setIntervalIdle(1000); + nebulaPoolConfig.setMinClusterHealthRate(1); // set wait time nebulaPoolConfig.setWaitTime(1000); List addresses = Collections.singletonList( From 50385bbb9ca6b4a6889534de891674e19370436c Mon Sep 17 00:00:00 2001 From: Amin Date: Tue, 8 Mar 2022 14:51:13 +0330 Subject: [PATCH 2/4] Checks on NebulaConfig edited --- .../java/com/vesoft/nebula/client/graph/net/NebulaPool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index c53f41399..381485510 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -73,9 +73,9 @@ private void checkConfig(NebulaPoolConfig config) { "Config waitTime:" + config.getWaitTime() + " is illegal"); } - if (config.getWaitTime() < 0) { + if (config.getMinClusterHealthRate() < 0) { throw new InvalidConfigException( - "Config waitTime:" + config.getWaitTime() + " is illegal"); + "Config minClusterHealthRate:" + config.getMinClusterHealthRate() + " is illegal"); } } From 43a2ebca9c3a4be6979fd4dc713e86f1b293b8f7 Mon Sep 17 00:00:00 2001 From: Amin Date: Sun, 13 Mar 2022 10:20:56 +0330 Subject: [PATCH 3/4] Resolve lint problems --- .../com/vesoft/nebula/client/graph/NebulaPoolConfig.java | 3 ++- .../java/com/vesoft/nebula/client/graph/net/NebulaPool.java | 6 ++++-- .../nebula/client/graph/net/RoundRobinLoadBalancer.java | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java b/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java index 168424e67..bccb9a58c 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/NebulaPoolConfig.java @@ -32,7 +32,8 @@ public class NebulaPoolConfig implements Serializable { // The wait time to get idle connection, unit ms private int waitTime = 0; - // The minimum rate of healthy servers to all servers. if 1 it means all servers should be available on init. + // The minimum rate of healthy servers to all servers. if 1 it means all servers should be + // available on init. private double minClusterHealthRate = 1; // Set to true to turn on ssl encrypted traffic diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index 381485510..490b2081a 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -75,7 +75,8 @@ private void checkConfig(NebulaPoolConfig config) { if (config.getMinClusterHealthRate() < 0) { throw new InvalidConfigException( - "Config minClusterHealthRate:" + config.getMinClusterHealthRate() + " is illegal"); + "Config minClusterHealthRate:" + config.getMinClusterHealthRate() + + " is illegal"); } } @@ -97,7 +98,8 @@ public boolean init(List addresses, NebulaPoolConfig config) this.loadBalancer = config.isEnableSsl() ? new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getSslParam(), config.getMinClusterHealthRate()) - : new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), config.getMinClusterHealthRate()); + : new RoundRobinLoadBalancer(newAddrs, config.getTimeout(), + config.getMinClusterHealthRate()); ConnObjectPool objectPool = new ConnObjectPool(this.loadBalancer, config); this.objectPool = new GenericObjectPool<>(objectPool); GenericObjectPoolConfig objConfig = new GenericObjectPoolConfig(); diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java index bd121c7d0..1b3f3f3c2 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/RoundRobinLoadBalancer.java @@ -29,7 +29,8 @@ public class RoundRobinLoadBalancer implements LoadBalancer { private SSLParam sslParam; private boolean enabledSsl; - public RoundRobinLoadBalancer(List addresses, int timeout, double minClusterHealthRate) { + public RoundRobinLoadBalancer(List addresses, int timeout, + double minClusterHealthRate) { this.timeout = timeout; for (HostAddress addr : addresses) { this.addresses.add(addr); From 5ef3b4ceae3d418cc618e334b73ddc8b14404744 Mon Sep 17 00:00:00 2001 From: Amin Maghsodi Date: Tue, 29 Mar 2022 11:36:05 +0430 Subject: [PATCH 4/4] Update TestConnectionPool.java --- .../com/vesoft/nebula/client/graph/net/TestConnectionPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java index 4a9e72059..a7a88d991 100644 --- a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java +++ b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java @@ -128,7 +128,7 @@ public void testGetSession() { // set idle time nebulaPoolConfig.setIdleTime(2000); nebulaPoolConfig.setIntervalIdle(1000); - nebulaPoolConfig.setMinClusterHealthRate(1); + nebulaPoolConfig.setMinClusterHealthRate(0); // set wait time nebulaPoolConfig.setWaitTime(1000); List addresses = Collections.singletonList(